diff --git a/synapse/__init__.py b/synapse/__init__.py
index c3016fc6ed..6d1c6d6f72 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
-__version__ = "1.36.0"
+__version__ = "1.37.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index cf4333a923..edf1b918eb 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -92,11 +92,8 @@ class Auth:
async def check_from_context(
self, room_version: str, event, context, do_sig_check=True
) -> None:
- prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = self.compute_auth_events(
- event, prev_state_ids, for_verification=True
- )
- auth_events_by_id = await self.store.get_events(auth_events_ids)
+ auth_event_ids = event.auth_event_ids()
+ auth_events_by_id = await self.store.get_events(auth_event_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index ca13843680..414e4c019a 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -65,6 +65,12 @@ class JoinRules:
MSC3083_RESTRICTED = "restricted"
+class RestrictedJoinRuleTypes:
+ """Understood types for the allow rules in restricted join rules."""
+
+ ROOM_MEMBERSHIP = "m.room_membership"
+
+
class LoginType:
PASSWORD = "m.login.password"
EMAIL_IDENTITY = "m.login.email.identity"
@@ -113,6 +119,9 @@ class EventTypes:
SpaceChild = "m.space.child"
SpaceParent = "m.space.parent"
+ MSC2716_INSERTION = "org.matrix.msc2716.insertion"
+ MSC2716_MARKER = "org.matrix.msc2716.marker"
+
class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request"
@@ -179,6 +188,18 @@ class EventContentFields:
# cf https://github.com/matrix-org/matrix-doc/pull/1772
ROOM_TYPE = "type"
+ # Used on normal messages to indicate they were historically imported after the fact
+ MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
+ # For "insertion" events
+ MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
+ # Used on normal message events to indicate where the chunk connects to
+ MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
+ # For "marker" events
+ MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
+ MSC2716_MARKER_INSERTION_PREV_EVENTS = (
+ "org.matrix.msc2716.marker.insertion_prev_events"
+ )
+
class RoomEncryptionAlgorithms:
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 1329af2e2b..8879136881 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -26,7 +26,9 @@ from typing import Awaitable, Callable, Iterable
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
+import twisted
from twisted.internet import defer, error, reactor
+from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
@@ -35,10 +37,10 @@ from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
+from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
-from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -112,8 +114,6 @@ def start_reactor(
run_command (Callable[]): callable that actually runs the reactor
"""
- install_dns_limiter(reactor)
-
def run():
logger.info("Running")
setup_jemalloc_stats()
@@ -141,7 +141,7 @@ def start_reactor(
def quit_with_error(error_string: str) -> NoReturn:
message_lines = error_string.split("\n")
- line_length = max(len(line) for line in message_lines if len(line) < 80) + 2
+ line_length = min(max(len(line) for line in message_lines), 80) + 2
sys.stderr.write("*" * line_length + "\n")
for line in message_lines:
sys.stderr.write(" %s\n" % (line.rstrip(),))
@@ -149,6 +149,30 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def handle_startup_exception(e: Exception) -> NoReturn:
+ # Exceptions that occur between setting up the logging and forking or starting
+ # the reactor are written to the logs, followed by a summary to stderr.
+ logger.exception("Exception during startup")
+ quit_with_error(
+ f"Error during initialisation:\n {e}\nThere may be more information in the logs."
+ )
+
+
+def redirect_stdio_to_logs() -> None:
+ streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)]
+
+ for (stream, level) in streams:
+ oldStream = getattr(sys, stream)
+ loggingFile = LoggingFile(
+ logger=twisted.logger.Logger(namespace=stream),
+ level=level,
+ encoding=getattr(oldStream, "encoding", None),
+ )
+ setattr(sys, stream, loggingFile)
+
+ print("Redirected stdout/stderr to logs")
+
+
def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
"""Register a callback with the reactor, to be called once it is running
@@ -292,8 +316,7 @@ async def start(hs: "synapse.server.HomeServer"):
"""
Start a Synapse server or worker.
- Should be called once the reactor is running and (if we're using ACME) the
- TLS certificates are in place.
+ Should be called once the reactor is running.
Will start the main HTTP listeners and do some other startup tasks, and then
notify systemd.
@@ -334,6 +357,14 @@ async def start(hs: "synapse.server.HomeServer"):
# Start the tracer
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
+ # Instantiate the modules so they can register their web resources to the module API
+ # before we start the listeners.
+ module_api = hs.get_module_api()
+ for module, config in hs.config.modules.loaded_modules:
+ module(config=config, api=module_api)
+
+ load_legacy_spam_checkers(hs)
+
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()
@@ -398,107 +429,6 @@ def setup_sdnotify(hs):
)
-def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
- """Replaces the resolver with one that limits the number of in flight DNS
- requests.
-
- This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
- can run out of file descriptors and infinite loop if we attempt to do too
- many DNS queries at once
-
- XXX: I'm confused by this. reactor.nameResolver does not use twisted.names unless
- you explicitly install twisted.names as the resolver; rather it uses a GAIResolver
- backed by the reactor's default threadpool (which is limited to 10 threads). So
- (a) I don't understand why twisted ticket 9620 is relevant, and (b) I don't
- understand why we would run out of FDs if we did too many lookups at once.
- -- richvdh 2020/08/29
- """
- new_resolver = _LimitedHostnameResolver(
- reactor.nameResolver, max_dns_requests_in_flight
- )
-
- reactor.installNameResolver(new_resolver)
-
-
-class _LimitedHostnameResolver:
- """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups."""
-
- def __init__(self, resolver, max_dns_requests_in_flight):
- self._resolver = resolver
- self._limiter = Linearizer(
- name="dns_client_limiter", max_count=max_dns_requests_in_flight
- )
-
- def resolveHostName(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
- # We need this function to return `resolutionReceiver` so we do all the
- # actual logic involving deferreds in a separate function.
-
- # even though this is happening within the depths of twisted, we need to drop
- # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
- # the logcontext if it returns an incomplete deferred; (b) _resolve will
- # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
- with PreserveLoggingContext():
- self._resolve(
- resolutionReceiver,
- hostName,
- portNumber,
- addressTypes,
- transportSemantics,
- )
-
- return resolutionReceiver
-
- @defer.inlineCallbacks
- def _resolve(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
-
- with (yield self._limiter.queue(())):
- # resolveHostName doesn't return a Deferred, so we need to hook into
- # the receiver interface to get told when resolution has finished.
-
- deferred = defer.Deferred()
- receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
-
- self._resolver.resolveHostName(
- receiver, hostName, portNumber, addressTypes, transportSemantics
- )
-
- yield deferred
-
-
-class _DeferredResolutionReceiver:
- """Wraps a IResolutionReceiver and simply resolves the given deferred when
- resolution is complete
- """
-
- def __init__(self, receiver, deferred):
- self._receiver = receiver
- self._deferred = deferred
-
- def resolutionBegan(self, resolutionInProgress):
- self._receiver.resolutionBegan(resolutionInProgress)
-
- def addressResolved(self, address):
- self._receiver.addressResolved(address)
-
- def resolutionComplete(self):
- self._deferred.callback(())
- self._receiver.resolutionComplete()
-
-
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 57c2fc2e88..af8a1833f3 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -32,7 +32,12 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
-from synapse.app._base import max_request_body_size, register_start
+from synapse.app._base import (
+ handle_startup_exception,
+ max_request_body_size,
+ redirect_stdio_to_logs,
+ register_start,
+)
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -354,6 +359,10 @@ class GenericWorkerServer(HomeServer):
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+ # Attach additional resources registered by modules.
+ resources.update(self._module_web_resources)
+ self._module_web_resources_consumed = True
+
root_resource = create_resource_tree(resources, OptionsResource())
_base.listen_tcp(
@@ -465,14 +474,21 @@ def start(config_options):
setup_logging(hs, config, use_worker_options=True)
- hs.setup()
+ try:
+ hs.setup()
- # Ensure the replication streamer is always started in case we write to any
- # streams. Will no-op if no streams can be written to by this worker.
- hs.get_replication_streamer()
+ # Ensure the replication streamer is always started in case we write to any
+ # streams. Will no-op if no streams can be written to by this worker.
+ hs.get_replication_streamer()
+ except Exception as e:
+ handle_startup_exception(e)
register_start(_base.start, hs)
+ # redirect stdio to the logs, if configured.
+ if not hs.config.no_redirect_stdio:
+ redirect_stdio_to_logs()
+
_base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b2501ee4d7..7af56ac136 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -37,10 +37,11 @@ from synapse.api.urls import (
)
from synapse.app import _base
from synapse.app._base import (
+ handle_startup_exception,
listen_ssl,
listen_tcp,
max_request_body_size,
- quit_with_error,
+ redirect_stdio_to_logs,
register_start,
)
from synapse.config._base import ConfigError
@@ -69,8 +70,6 @@ from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
-from synapse.storage.engines import IncorrectDatabaseSetup
-from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.module_loader import load_module
from synapse.util.versionstring import get_version_string
@@ -124,6 +123,10 @@ class SynapseHomeServer(HomeServer):
)
resources[path] = resource
+ # Attach additional resources registered by modules.
+ resources.update(self._module_web_resources)
+ self._module_web_resources_consumed = True
+
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:
root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
@@ -358,60 +361,10 @@ def setup(config_options):
try:
hs.setup()
- except IncorrectDatabaseSetup as e:
- quit_with_error(str(e))
- except UpgradeDatabaseException as e:
- quit_with_error("Failed to upgrade database: %s" % (e,))
-
- async def do_acme() -> bool:
- """
- Reprovision an ACME certificate, if it's required.
-
- Returns:
- Whether the cert has been updated.
- """
- acme = hs.get_acme_handler()
-
- # Check how long the certificate is active for.
- cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False)
-
- # We want to reprovision if cert_days_remaining is None (meaning no
- # certificate exists), or the days remaining number it returns
- # is less than our re-registration threshold.
- provision = False
-
- if (
- cert_days_remaining is None
- or cert_days_remaining < hs.config.acme_reprovision_threshold
- ):
- provision = True
-
- if provision:
- await acme.provision_certificate()
-
- return provision
-
- async def reprovision_acme():
- """
- Provision a certificate from ACME, if required, and reload the TLS
- certificate if it's renewed.
- """
- reprovisioned = await do_acme()
- if reprovisioned:
- _base.refresh_certificate(hs)
+ except Exception as e:
+ handle_startup_exception(e)
async def start():
- # Run the ACME provisioning code, if it's enabled.
- if hs.config.acme_enabled:
- acme = hs.get_acme_handler()
- # Start up the webservices which we will respond to ACME
- # challenges with, and then provision.
- await acme.start_listening()
- await do_acme()
-
- # Check if it needs to be reprovisioned every day.
- hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
-
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc_enabled:
oidc = hs.get_oidc_handler()
@@ -500,6 +453,11 @@ def main():
# check base requirements
check_requirements()
hs = setup(sys.argv[1:])
+
+ # redirect stdio to the logs, if configured.
+ if not hs.config.no_redirect_stdio:
+ redirect_stdio_to_logs()
+
run(hs)
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 08e2c2c543..d6ec618f8f 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -405,7 +405,6 @@ class RootConfig:
listeners=None,
tls_certificate_path=None,
tls_private_key_path=None,
- acme_domain=None,
):
"""
Build a default configuration file
@@ -457,9 +456,6 @@ class RootConfig:
tls_private_key_path (str|None): The path to the tls private key.
- acme_domain (str|None): The domain acme will try to validate. If
- specified acme will be enabled.
-
Returns:
str: the yaml config file
"""
@@ -477,7 +473,6 @@ class RootConfig:
listeners=listeners,
tls_certificate_path=tls_certificate_path,
tls_private_key_path=tls_private_key_path,
- acme_domain=acme_domain,
).values()
)
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index ff9abbc232..23ca0c83c1 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -11,11 +11,13 @@ from synapse.config import (
database,
emailconfig,
experimental,
+ federation,
groups,
jwt,
key,
logger,
metrics,
+ modules,
oidc,
password_auth_providers,
push,
@@ -85,6 +87,8 @@ class RootConfig:
thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig
tracer: tracer.TracerConfig
redis: redis.RedisConfig
+ modules: modules.ModulesConfig
+ federation: federation.FederationConfig
config_classes: List = ...
def __init__(self) -> None: ...
@@ -111,7 +115,6 @@ class RootConfig:
database_conf: Optional[Any] = ...,
tls_certificate_path: Optional[str] = ...,
tls_private_key_path: Optional[str] = ...,
- acme_domain: Optional[str] = ...,
): ...
@classmethod
def load_or_generate_config(cls, description: Any, argv: Any): ...
diff --git a/synapse/config/auth.py b/synapse/config/auth.py
index e10d641a96..53809cee2e 100644
--- a/synapse/config/auth.py
+++ b/synapse/config/auth.py
@@ -103,6 +103,10 @@ class AuthConfig(Config):
# the user-interactive authentication process, by allowing for multiple
# (and potentially different) operations to use the same validation session.
#
+ # This is ignored for potentially "dangerous" operations (including
+ # deactivating an account, modifying an account password, and
+ # adding a 3PID).
+ #
# Uncomment below to allow for credential validation to last for 15
# seconds.
#
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 6ebce4b2f7..7fb1f7021f 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -29,3 +29,6 @@ class ExperimentalConfig(Config):
# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
+
+ # MSC2716 (backfill existing history)
+ self.msc2716_enabled = experimental.get("msc2716_enabled", False) # type: bool
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 5ae0f55bcc..1f42a51857 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -1,5 +1,4 @@
-# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,6 +29,7 @@ from .jwt import JWTConfig
from .key import KeyConfig
from .logger import LoggingConfig
from .metrics import MetricsConfig
+from .modules import ModulesConfig
from .oidc import OIDCConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
@@ -56,6 +56,7 @@ from .workers import WorkerConfig
class HomeServerConfig(RootConfig):
config_classes = [
+ ModulesConfig,
ServerConfig,
TlsConfig,
FederationConfig,
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 813076dfe2..91d9bcf32e 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -259,9 +259,7 @@ def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) ->
finally:
threadlocal.active = False
- logBeginner.beginLoggingTo([_log], redirectStandardIO=not config.no_redirect_stdio)
- if not config.no_redirect_stdio:
- print("Redirected stdout/stderr to logs")
+ logBeginner.beginLoggingTo([_log], redirectStandardIO=False)
def _load_logging_config(log_config_path: str) -> None:
diff --git a/synapse/config/modules.py b/synapse/config/modules.py
new file mode 100644
index 0000000000..3209e1c492
--- /dev/null
+++ b/synapse/config/modules.py
@@ -0,0 +1,49 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Any, Dict, List, Tuple
+
+from synapse.config._base import Config, ConfigError
+from synapse.util.module_loader import load_module
+
+
+class ModulesConfig(Config):
+ section = "modules"
+
+ def read_config(self, config: dict, **kwargs):
+ self.loaded_modules: List[Tuple[Any, Dict]] = []
+
+ configured_modules = config.get("modules") or []
+ for i, module in enumerate(configured_modules):
+ config_path = ("modules", "<item %i>" % i)
+ if not isinstance(module, dict):
+ raise ConfigError("expected a mapping", config_path)
+
+ self.loaded_modules.append(load_module(module, config_path))
+
+ def generate_config_section(self, **kwargs):
+ return """
+ ## Modules ##
+
+ # Server admins can expand Synapse's functionality with external modules.
+ #
+ # See https://matrix-org.github.io/synapse/develop/modules.html for more
+ # documentation on how to configure or create custom modules for Synapse.
+ #
+ modules:
+ # - module: my_super_module.MySuperClass
+ # config:
+ # do_thing: true
+ # - module: my_other_super_module.SomeClass
+ # config: {}
+ """
diff --git a/synapse/config/spam_checker.py b/synapse/config/spam_checker.py
index 447ba3303b..d0311d6468 100644
--- a/synapse/config/spam_checker.py
+++ b/synapse/config/spam_checker.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
from typing import Any, Dict, List, Tuple
from synapse.config import ConfigError
@@ -19,6 +20,15 @@ from synapse.util.module_loader import load_module
from ._base import Config
+logger = logging.getLogger(__name__)
+
+LEGACY_SPAM_CHECKER_WARNING = """
+This server is using a spam checker module that is implementing the deprecated spam
+checker interface. Please check with the module's maintainer to see if a new version
+supporting Synapse's generic modules system is available.
+For more information, please see https://matrix-org.github.io/synapse/develop/modules.html
+---------------------------------------------------------------------------------------"""
+
class SpamCheckerConfig(Config):
section = "spamchecker"
@@ -43,17 +53,7 @@ class SpamCheckerConfig(Config):
else:
raise ConfigError("spam_checker syntax is incorrect")
- def generate_config_section(self, **kwargs):
- return """\
- # Spam checkers are third-party modules that can block specific actions
- # of local users, such as creating rooms and registering undesirable
- # usernames, as well as remote users by redacting incoming events.
- #
- spam_checker:
- #- module: "my_custom_project.SuperSpamChecker"
- # config:
- # example_option: 'things'
- #- module: "some_other_project.BadEventStopper"
- # config:
- # example_stop_events_from: ['@bad:example.com']
- """
+ # If this configuration is being used in any way, warn the admin that it is going
+ # away soon.
+ if self.spam_checkers:
+ logger.warning(LEGACY_SPAM_CHECKER_WARNING)
diff --git a/synapse/config/sso.py b/synapse/config/sso.py
index af645c930d..e4346e02aa 100644
--- a/synapse/config/sso.py
+++ b/synapse/config/sso.py
@@ -74,6 +74,10 @@ class SSOConfig(Config):
self.sso_client_whitelist = sso_config.get("client_whitelist") or []
+ self.sso_update_profile_information = (
+ sso_config.get("update_profile_information") or False
+ )
+
# Attempt to also whitelist the server's login fallback, since that fallback sets
# the redirect URL to itself (so it can process the login token then return
# gracefully to the client). This would make it pointless to ask the user for
@@ -111,6 +115,17 @@ class SSOConfig(Config):
# - https://riot.im/develop
# - https://my.custom.client/
+ # Uncomment to keep a user's profile fields in sync with information from
+ # the identity provider. Currently only syncing the displayname is
+ # supported. Fields are checked on every SSO login, and are updated
+ # if necessary.
+ #
+ # Note that enabling this option will override user profile information,
+ # regardless of whether users have opted-out of syncing that
+ # information when first signing in. Defaults to false.
+ #
+ #update_profile_information: true
+
# Directory in which Synapse will try to find the template files below.
# If not set, or the files named below are not found within the template
# directory, default templates from within the Synapse package will be used.
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 0e9bba53c9..9a16a8fbae 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -14,7 +14,6 @@
import logging
import os
-import warnings
from datetime import datetime
from typing import List, Optional, Pattern
@@ -26,45 +25,12 @@ from synapse.util import glob_to_regex
logger = logging.getLogger(__name__)
-ACME_SUPPORT_ENABLED_WARN = """\
-This server uses Synapse's built-in ACME support. Note that ACME v1 has been
-deprecated by Let's Encrypt, and that Synapse doesn't currently support ACME v2,
-which means that this feature will not work with Synapse installs set up after
-November 2019, and that it may stop working on June 2020 for installs set up
-before that date.
-
-For more info and alternative solutions, see
-https://github.com/matrix-org/synapse/blob/master/docs/ACME.md#deprecation-of-acme-v1
---------------------------------------------------------------------------------"""
-
class TlsConfig(Config):
section = "tls"
def read_config(self, config: dict, config_dir_path: str, **kwargs):
- acme_config = config.get("acme", None)
- if acme_config is None:
- acme_config = {}
-
- self.acme_enabled = acme_config.get("enabled", False)
-
- if self.acme_enabled:
- logger.warning(ACME_SUPPORT_ENABLED_WARN)
-
- # hyperlink complains on py2 if this is not a Unicode
- self.acme_url = str(
- acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory")
- )
- self.acme_port = acme_config.get("port", 80)
- self.acme_bind_addresses = acme_config.get("bind_addresses", ["::", "0.0.0.0"])
- self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
- self.acme_domain = acme_config.get("domain", config.get("server_name"))
-
- self.acme_account_key_file = self.abspath(
- acme_config.get("account_key_file", config_dir_path + "/client.key")
- )
-
self.tls_certificate_file = self.abspath(config.get("tls_certificate_path"))
self.tls_private_key_file = self.abspath(config.get("tls_private_key_path"))
@@ -229,11 +195,9 @@ class TlsConfig(Config):
data_dir_path,
tls_certificate_path,
tls_private_key_path,
- acme_domain,
**kwargs,
):
- """If the acme_domain is specified acme will be enabled.
- If the TLS paths are not specified the default will be certs in the
+ """If the TLS paths are not specified the default will be certs in the
config directory"""
base_key_name = os.path.join(config_dir_path, server_name)
@@ -243,28 +207,15 @@ class TlsConfig(Config):
"Please specify both a cert path and a key path or neither."
)
- tls_enabled = (
- "" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
- )
+ tls_enabled = "" if tls_certificate_path and tls_private_key_path else "#"
if not tls_certificate_path:
tls_certificate_path = base_key_name + ".tls.crt"
if not tls_private_key_path:
tls_private_key_path = base_key_name + ".tls.key"
- acme_enabled = bool(acme_domain)
- acme_domain = "matrix.example.com"
-
- default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
-
- # this is to avoid the max line length. Sorrynotsorry
- proxypassline = (
- "ProxyPass /.well-known/acme-challenge "
- "http://localhost:8009/.well-known/acme-challenge"
- )
-
# flake8 doesn't recognise that variables are used in the below string
- _ = tls_enabled, proxypassline, acme_enabled, default_acme_account_file
+ _ = tls_enabled
return (
"""\
@@ -274,13 +225,9 @@ class TlsConfig(Config):
# This certificate, as of Synapse 1.0, will need to be a valid and verifiable
# certificate, signed by a recognised Certificate Authority.
#
- # See 'ACME support' below to enable auto-provisioning this certificate via
- # Let's Encrypt.
- #
- # If supplying your own, be sure to use a `.pem` file that includes the
- # full certificate chain including any intermediate certificates (for
- # instance, if using certbot, use `fullchain.pem` as your certificate,
- # not `cert.pem`).
+ # Be sure to use a `.pem` file that includes the full certificate chain including
+ # any intermediate certificates (for instance, if using certbot, use
+ # `fullchain.pem` as your certificate, not `cert.pem`).
#
%(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
@@ -330,80 +277,6 @@ class TlsConfig(Config):
# - myCA1.pem
# - myCA2.pem
# - myCA3.pem
-
- # ACME support: This will configure Synapse to request a valid TLS certificate
- # for your configured `server_name` via Let's Encrypt.
- #
- # Note that ACME v1 is now deprecated, and Synapse currently doesn't support
- # ACME v2. This means that this feature currently won't work with installs set
- # up after November 2019. For more info, and alternative solutions, see
- # https://github.com/matrix-org/synapse/blob/master/docs/ACME.md#deprecation-of-acme-v1
- #
- # Note that provisioning a certificate in this way requires port 80 to be
- # routed to Synapse so that it can complete the http-01 ACME challenge.
- # By default, if you enable ACME support, Synapse will attempt to listen on
- # port 80 for incoming http-01 challenges - however, this will likely fail
- # with 'Permission denied' or a similar error.
- #
- # There are a couple of potential solutions to this:
- #
- # * If you already have an Apache, Nginx, or similar listening on port 80,
- # you can configure Synapse to use an alternate port, and have your web
- # server forward the requests. For example, assuming you set 'port: 8009'
- # below, on Apache, you would write:
- #
- # %(proxypassline)s
- #
- # * Alternatively, you can use something like `authbind` to give Synapse
- # permission to listen on port 80.
- #
- acme:
- # ACME support is disabled by default. Set this to `true` and uncomment
- # tls_certificate_path and tls_private_key_path above to enable it.
- #
- enabled: %(acme_enabled)s
-
- # Endpoint to use to request certificates. If you only want to test,
- # use Let's Encrypt's staging url:
- # https://acme-staging.api.letsencrypt.org/directory
- #
- #url: https://acme-v01.api.letsencrypt.org/directory
-
- # Port number to listen on for the HTTP-01 challenge. Change this if
- # you are forwarding connections through Apache/Nginx/etc.
- #
- port: 80
-
- # Local addresses to listen on for incoming connections.
- # Again, you may want to change this if you are forwarding connections
- # through Apache/Nginx/etc.
- #
- bind_addresses: ['::', '0.0.0.0']
-
- # How many days remaining on a certificate before it is renewed.
- #
- reprovision_threshold: 30
-
- # The domain that the certificate should be for. Normally this
- # should be the same as your Matrix domain (i.e., 'server_name'), but,
- # by putting a file at 'https://<server_name>/.well-known/matrix/server',
- # you can delegate incoming traffic to another server. If you do that,
- # you should give the target of the delegation here.
- #
- # For example: if your 'server_name' is 'example.com', but
- # 'https://example.com/.well-known/matrix/server' delegates to
- # 'matrix.example.com', you should put 'matrix.example.com' here.
- #
- # If not set, defaults to your 'server_name'.
- #
- domain: %(acme_domain)s
-
- # file to use for the account key. This will be generated if it doesn't
- # exist.
- #
- # If unspecified, we will use CONFDIR/client.key.
- #
- account_key_file: %(default_acme_account_file)s
"""
# Lowercase the string representation of boolean values
% {
@@ -415,8 +288,6 @@ class TlsConfig(Config):
def read_tls_certificate(self) -> crypto.X509:
"""Reads the TLS certificate from the configured file, and returns it
- Also checks if it is self-signed, and warns if so
-
Returns:
The certificate
"""
@@ -425,16 +296,6 @@ class TlsConfig(Config):
cert_pem = self.read_file(cert_path, "tls_certificate_path")
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
- # Check if it is self-signed, and issue a warning if so.
- if cert.get_issuer() == cert.get_subject():
- warnings.warn(
- (
- "Self-signed TLS certificates will not be accepted by Synapse 1.0. "
- "Please either provide a valid certificate, or use Synapse's ACME "
- "support to provision one."
- )
- )
-
return cert
def read_tls_private_key(self) -> crypto.PKey:
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index c8b52cbc7a..0cb9c1cc1e 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -119,6 +119,7 @@ class _EventInternalMetadata:
redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str
token_id = DictProperty("token_id") # type: str
+ historical = DictProperty("historical") # type: bool
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
@@ -204,6 +205,14 @@ class _EventInternalMetadata:
"""
return self._dict.get("redacted", False)
+ def is_historical(self) -> bool:
+ """Whether this is a historical message.
+ This is used by the batchsend historical message endpoint and
+ is needed to and mark the event as backfilled and skip some checks
+ like push notifications.
+ """
+ return self._dict.get("historical", False)
+
class EventBase(metaclass=abc.ABCMeta):
@property
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 5793553a88..81bf8615b7 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
from typing import Any, Dict, List, Optional, Tuple, Union
import attr
@@ -33,6 +34,8 @@ from synapse.types import EventID, JsonDict
from synapse.util import Clock
from synapse.util.stringutils import random_string
+logger = logging.getLogger(__name__)
+
@attr.s(slots=True, cmp=False, frozen=True)
class EventBuilder:
@@ -100,6 +103,7 @@ class EventBuilder:
self,
prev_event_ids: List[str],
auth_event_ids: Optional[List[str]],
+ depth: Optional[int] = None,
) -> EventBase:
"""Transform into a fully signed and hashed event
@@ -108,6 +112,9 @@ class EventBuilder:
auth_event_ids: The event IDs to use as the auth events.
Should normally be set to None, which will cause them to be calculated
based on the room state at the prev_events.
+ depth: Override the depth used to order the event in the DAG.
+ Should normally be set to None, which will cause the depth to be calculated
+ based on the prev_events.
Returns:
The signed and hashed event.
@@ -131,8 +138,14 @@ class EventBuilder:
auth_events = auth_event_ids
prev_events = prev_event_ids
- old_depth = await self._store.get_max_depth_of(prev_event_ids)
- depth = old_depth + 1
+ # Otherwise, progress the depth as normal
+ if depth is None:
+ (
+ _,
+ most_recent_prev_event_depth,
+ ) = await self._store.get_max_depth_of(prev_event_ids)
+
+ depth = most_recent_prev_event_depth + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index d5fa195094..45ec96dfc1 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -15,7 +15,18 @@
import inspect
import logging
-from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Collection,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+ Union,
+)
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
@@ -29,20 +40,186 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+CHECK_EVENT_FOR_SPAM_CALLBACK = Callable[
+ ["synapse.events.EventBase"],
+ Awaitable[Union[bool, str]],
+]
+USER_MAY_INVITE_CALLBACK = Callable[[str, str, str], Awaitable[bool]]
+USER_MAY_CREATE_ROOM_CALLBACK = Callable[[str], Awaitable[bool]]
+USER_MAY_CREATE_ROOM_ALIAS_CALLBACK = Callable[[str, RoomAlias], Awaitable[bool]]
+USER_MAY_PUBLISH_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]]
+CHECK_USERNAME_FOR_SPAM_CALLBACK = Callable[[Dict[str, str]], Awaitable[bool]]
+LEGACY_CHECK_REGISTRATION_FOR_SPAM_CALLBACK = Callable[
+ [
+ Optional[dict],
+ Optional[str],
+ Collection[Tuple[str, str]],
+ ],
+ Awaitable[RegistrationBehaviour],
+]
+CHECK_REGISTRATION_FOR_SPAM_CALLBACK = Callable[
+ [
+ Optional[dict],
+ Optional[str],
+ Collection[Tuple[str, str]],
+ Optional[str],
+ ],
+ Awaitable[RegistrationBehaviour],
+]
+CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK = Callable[
+ [ReadableFileWrapper, FileInfo],
+ Awaitable[bool],
+]
+
+
+def load_legacy_spam_checkers(hs: "synapse.server.HomeServer"):
+ """Wrapper that loads spam checkers configured using the old configuration, and
+ registers the spam checker hooks they implement.
+ """
+ spam_checkers = [] # type: List[Any]
+ api = hs.get_module_api()
+ for module, config in hs.config.spam_checkers:
+ # Older spam checkers don't accept the `api` argument, so we
+ # try and detect support.
+ spam_args = inspect.getfullargspec(module)
+ if "api" in spam_args.args:
+ spam_checkers.append(module(config=config, api=api))
+ else:
+ spam_checkers.append(module(config=config))
+
+ # The known spam checker hooks. If a spam checker module implements a method
+ # which name appears in this set, we'll want to register it.
+ spam_checker_methods = {
+ "check_event_for_spam",
+ "user_may_invite",
+ "user_may_create_room",
+ "user_may_create_room_alias",
+ "user_may_publish_room",
+ "check_username_for_spam",
+ "check_registration_for_spam",
+ "check_media_file_for_spam",
+ }
+
+ for spam_checker in spam_checkers:
+ # Methods on legacy spam checkers might not be async, so we wrap them around a
+ # wrapper that will call maybe_awaitable on the result.
+ def async_wrapper(f: Optional[Callable]) -> Optional[Callable[..., Awaitable]]:
+ # f might be None if the callback isn't implemented by the module. In this
+ # case we don't want to register a callback at all so we return None.
+ if f is None:
+ return None
+
+ if f.__name__ == "check_registration_for_spam":
+ checker_args = inspect.signature(f)
+ if len(checker_args.parameters) == 3:
+ # Backwards compatibility; some modules might implement a hook that
+ # doesn't expect a 4th argument. In this case, wrap it in a function
+ # that gives it only 3 arguments and drops the auth_provider_id on
+ # the floor.
+ def wrapper(
+ email_threepid: Optional[dict],
+ username: Optional[str],
+ request_info: Collection[Tuple[str, str]],
+ auth_provider_id: Optional[str],
+ ) -> Union[Awaitable[RegistrationBehaviour], RegistrationBehaviour]:
+ # We've already made sure f is not None above, but mypy doesn't
+ # do well across function boundaries so we need to tell it f is
+ # definitely not None.
+ assert f is not None
+
+ return f(
+ email_threepid,
+ username,
+ request_info,
+ )
+
+ f = wrapper
+ elif len(checker_args.parameters) != 4:
+ raise RuntimeError(
+ "Bad signature for callback check_registration_for_spam",
+ )
+
+ def run(*args, **kwargs):
+ # We've already made sure f is not None above, but mypy doesn't do well
+ # across function boundaries so we need to tell it f is definitely not
+ # None.
+ assert f is not None
+
+ return maybe_awaitable(f(*args, **kwargs))
+
+ return run
+
+ # Register the hooks through the module API.
+ hooks = {
+ hook: async_wrapper(getattr(spam_checker, hook, None))
+ for hook in spam_checker_methods
+ }
+
+ api.register_spam_checker_callbacks(**hooks)
+
class SpamChecker:
- def __init__(self, hs: "synapse.server.HomeServer"):
- self.spam_checkers = [] # type: List[Any]
- api = hs.get_module_api()
-
- for module, config in hs.config.spam_checkers:
- # Older spam checkers don't accept the `api` argument, so we
- # try and detect support.
- spam_args = inspect.getfullargspec(module)
- if "api" in spam_args.args:
- self.spam_checkers.append(module(config=config, api=api))
- else:
- self.spam_checkers.append(module(config=config))
+ def __init__(self):
+ self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
+ self._user_may_invite_callbacks: List[USER_MAY_INVITE_CALLBACK] = []
+ self._user_may_create_room_callbacks: List[USER_MAY_CREATE_ROOM_CALLBACK] = []
+ self._user_may_create_room_alias_callbacks: List[
+ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK
+ ] = []
+ self._user_may_publish_room_callbacks: List[USER_MAY_PUBLISH_ROOM_CALLBACK] = []
+ self._check_username_for_spam_callbacks: List[
+ CHECK_USERNAME_FOR_SPAM_CALLBACK
+ ] = []
+ self._check_registration_for_spam_callbacks: List[
+ CHECK_REGISTRATION_FOR_SPAM_CALLBACK
+ ] = []
+ self._check_media_file_for_spam_callbacks: List[
+ CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK
+ ] = []
+
+ def register_callbacks(
+ self,
+ check_event_for_spam: Optional[CHECK_EVENT_FOR_SPAM_CALLBACK] = None,
+ user_may_invite: Optional[USER_MAY_INVITE_CALLBACK] = None,
+ user_may_create_room: Optional[USER_MAY_CREATE_ROOM_CALLBACK] = None,
+ user_may_create_room_alias: Optional[
+ USER_MAY_CREATE_ROOM_ALIAS_CALLBACK
+ ] = None,
+ user_may_publish_room: Optional[USER_MAY_PUBLISH_ROOM_CALLBACK] = None,
+ check_username_for_spam: Optional[CHECK_USERNAME_FOR_SPAM_CALLBACK] = None,
+ check_registration_for_spam: Optional[
+ CHECK_REGISTRATION_FOR_SPAM_CALLBACK
+ ] = None,
+ check_media_file_for_spam: Optional[CHECK_MEDIA_FILE_FOR_SPAM_CALLBACK] = None,
+ ):
+ """Register callbacks from module for each hook."""
+ if check_event_for_spam is not None:
+ self._check_event_for_spam_callbacks.append(check_event_for_spam)
+
+ if user_may_invite is not None:
+ self._user_may_invite_callbacks.append(user_may_invite)
+
+ if user_may_create_room is not None:
+ self._user_may_create_room_callbacks.append(user_may_create_room)
+
+ if user_may_create_room_alias is not None:
+ self._user_may_create_room_alias_callbacks.append(
+ user_may_create_room_alias,
+ )
+
+ if user_may_publish_room is not None:
+ self._user_may_publish_room_callbacks.append(user_may_publish_room)
+
+ if check_username_for_spam is not None:
+ self._check_username_for_spam_callbacks.append(check_username_for_spam)
+
+ if check_registration_for_spam is not None:
+ self._check_registration_for_spam_callbacks.append(
+ check_registration_for_spam,
+ )
+
+ if check_media_file_for_spam is not None:
+ self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)
async def check_event_for_spam(
self, event: "synapse.events.EventBase"
@@ -60,9 +237,10 @@ class SpamChecker:
True or a string if the event is spammy. If a string is returned it
will be used as the error message returned to the user.
"""
- for spam_checker in self.spam_checkers:
- if await maybe_awaitable(spam_checker.check_event_for_spam(event)):
- return True
+ for callback in self._check_event_for_spam_callbacks:
+ res = await callback(event) # type: Union[bool, str]
+ if res:
+ return res
return False
@@ -81,15 +259,8 @@ class SpamChecker:
Returns:
True if the user may send an invite, otherwise False
"""
- for spam_checker in self.spam_checkers:
- if (
- await maybe_awaitable(
- spam_checker.user_may_invite(
- inviter_userid, invitee_userid, room_id
- )
- )
- is False
- ):
+ for callback in self._user_may_invite_callbacks:
+ if await callback(inviter_userid, invitee_userid, room_id) is False:
return False
return True
@@ -105,11 +276,8 @@ class SpamChecker:
Returns:
True if the user may create a room, otherwise False
"""
- for spam_checker in self.spam_checkers:
- if (
- await maybe_awaitable(spam_checker.user_may_create_room(userid))
- is False
- ):
+ for callback in self._user_may_create_room_callbacks:
+ if await callback(userid) is False:
return False
return True
@@ -128,13 +296,8 @@ class SpamChecker:
Returns:
True if the user may create a room alias, otherwise False
"""
- for spam_checker in self.spam_checkers:
- if (
- await maybe_awaitable(
- spam_checker.user_may_create_room_alias(userid, room_alias)
- )
- is False
- ):
+ for callback in self._user_may_create_room_alias_callbacks:
+ if await callback(userid, room_alias) is False:
return False
return True
@@ -151,13 +314,8 @@ class SpamChecker:
Returns:
True if the user may publish the room, otherwise False
"""
- for spam_checker in self.spam_checkers:
- if (
- await maybe_awaitable(
- spam_checker.user_may_publish_room(userid, room_id)
- )
- is False
- ):
+ for callback in self._user_may_publish_room_callbacks:
+ if await callback(userid, room_id) is False:
return False
return True
@@ -177,15 +335,11 @@ class SpamChecker:
Returns:
True if the user is spammy.
"""
- for spam_checker in self.spam_checkers:
- # For backwards compatibility, only run if the method exists on the
- # spam checker
- checker = getattr(spam_checker, "check_username_for_spam", None)
- if checker:
- # Make a copy of the user profile object to ensure the spam checker
- # cannot modify it.
- if await maybe_awaitable(checker(user_profile.copy())):
- return True
+ for callback in self._check_username_for_spam_callbacks:
+ # Make a copy of the user profile object to ensure the spam checker cannot
+ # modify it.
+ if await callback(user_profile.copy()):
+ return True
return False
@@ -211,33 +365,13 @@ class SpamChecker:
Enum for how the request should be handled
"""
- for spam_checker in self.spam_checkers:
- # For backwards compatibility, only run if the method exists on the
- # spam checker
- checker = getattr(spam_checker, "check_registration_for_spam", None)
- if checker:
- # Provide auth_provider_id if the function supports it
- checker_args = inspect.signature(checker)
- if len(checker_args.parameters) == 4:
- d = checker(
- email_threepid,
- username,
- request_info,
- auth_provider_id,
- )
- elif len(checker_args.parameters) == 3:
- d = checker(email_threepid, username, request_info)
- else:
- logger.error(
- "Invalid signature for %s.check_registration_for_spam. Denying registration",
- spam_checker.__module__,
- )
- return RegistrationBehaviour.DENY
-
- behaviour = await maybe_awaitable(d)
- assert isinstance(behaviour, RegistrationBehaviour)
- if behaviour != RegistrationBehaviour.ALLOW:
- return behaviour
+ for callback in self._check_registration_for_spam_callbacks:
+ behaviour = await (
+ callback(email_threepid, username, request_info, auth_provider_id)
+ )
+ assert isinstance(behaviour, RegistrationBehaviour)
+ if behaviour != RegistrationBehaviour.ALLOW:
+ return behaviour
return RegistrationBehaviour.ALLOW
@@ -275,13 +409,9 @@ class SpamChecker:
allowed.
"""
- for spam_checker in self.spam_checkers:
- # For backwards compatibility, only run if the method exists on the
- # spam checker
- checker = getattr(spam_checker, "check_media_file_for_spam", None)
- if checker:
- spam = await maybe_awaitable(checker(file_wrapper, file_info))
- if spam:
- return True
+ for callback in self._check_media_file_for_spam_callbacks:
+ spam = await callback(file_wrapper, file_info)
+ if spam:
+ return True
return False
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 16d740cf58..bed47f8abd 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -35,6 +35,7 @@ from synapse.http.servlet import (
parse_string_from_args,
parse_strings_from_args,
)
+from synapse.logging import opentracing
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
SynapseTags,
@@ -345,6 +346,8 @@ class BaseFederationServlet:
)
with scope:
+ opentracing.inject_response_headers(request.responseHeaders)
+
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
deleted file mode 100644
index 16ab93f580..0000000000
--- a/synapse/handlers/acme.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import TYPE_CHECKING
-
-import twisted
-import twisted.internet.error
-from twisted.web import server, static
-from twisted.web.resource import Resource
-
-from synapse.app import check_bind_error
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-ACME_REGISTER_FAIL_ERROR = """
---------------------------------------------------------------------------------
-Failed to register with the ACME provider. This is likely happening because the installation
-is new, and ACME v1 has been deprecated by Let's Encrypt and disabled for
-new installations since November 2019.
-At the moment, Synapse doesn't support ACME v2. For more information and alternative
-solutions, please read https://github.com/matrix-org/synapse/blob/master/docs/ACME.md#deprecation-of-acme-v1
---------------------------------------------------------------------------------"""
-
-
-class AcmeHandler:
- def __init__(self, hs: "HomeServer"):
- self.hs = hs
- self.reactor = hs.get_reactor()
- self._acme_domain = hs.config.acme_domain
-
- async def start_listening(self) -> None:
- from synapse.handlers import acme_issuing_service
-
- # Configure logging for txacme, if you need to debug
- # from eliot import add_destinations
- # from eliot.twisted import TwistedDestination
- #
- # add_destinations(TwistedDestination())
-
- well_known = Resource()
-
- self._issuer = acme_issuing_service.create_issuing_service(
- self.reactor,
- acme_url=self.hs.config.acme_url,
- account_key_file=self.hs.config.acme_account_key_file,
- well_known_resource=well_known,
- )
-
- responder_resource = Resource()
- responder_resource.putChild(b".well-known", well_known)
- responder_resource.putChild(b"check", static.Data(b"OK", b"text/plain"))
- srv = server.Site(responder_resource)
-
- bind_addresses = self.hs.config.acme_bind_addresses
- for host in bind_addresses:
- logger.info(
- "Listening for ACME requests on %s:%i", host, self.hs.config.acme_port
- )
- try:
- self.reactor.listenTCP(
- self.hs.config.acme_port, srv, backlog=50, interface=host
- )
- except twisted.internet.error.CannotListenError as e:
- check_bind_error(e, host, bind_addresses)
-
- # Make sure we are registered to the ACME server. There's no public API
- # for this, it is usually triggered by startService, but since we don't
- # want it to control where we save the certificates, we have to reach in
- # and trigger the registration machinery ourselves.
- self._issuer._registered = False
-
- try:
- await self._issuer._ensure_registered()
- except Exception:
- logger.error(ACME_REGISTER_FAIL_ERROR)
- raise
-
- async def provision_certificate(self) -> None:
-
- logger.warning("Reprovisioning %s", self._acme_domain)
-
- try:
- await self._issuer.issue_cert(self._acme_domain)
- except Exception:
- logger.exception("Fail!")
- raise
- logger.warning("Reprovisioned %s, saving.", self._acme_domain)
- cert_chain = self._issuer.cert_store.certs[self._acme_domain]
-
- try:
- with open(self.hs.config.tls_private_key_file, "wb") as private_key_file:
- for x in cert_chain:
- if x.startswith(b"-----BEGIN RSA PRIVATE KEY-----"):
- private_key_file.write(x)
-
- with open(self.hs.config.tls_certificate_file, "wb") as certificate_file:
- for x in cert_chain:
- if x.startswith(b"-----BEGIN CERTIFICATE-----"):
- certificate_file.write(x)
- except Exception:
- logger.exception("Failed saving!")
- raise
diff --git a/synapse/handlers/acme_issuing_service.py b/synapse/handlers/acme_issuing_service.py
deleted file mode 100644
index a972d3fa0a..0000000000
--- a/synapse/handlers/acme_issuing_service.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright 2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Utility function to create an ACME issuing service.
-
-This file contains the unconditional imports on the acme and cryptography bits that we
-only need (and may only have available) if we are doing ACME, so is designed to be
-imported conditionally.
-"""
-import logging
-from typing import Dict, Iterable, List
-
-import attr
-import pem
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import serialization
-from josepy import JWKRSA
-from josepy.jwa import RS256
-from txacme.challenges import HTTP01Responder
-from txacme.client import Client
-from txacme.interfaces import ICertificateStore
-from txacme.service import AcmeIssuingService
-from txacme.util import generate_private_key
-from zope.interface import implementer
-
-from twisted.internet import defer
-from twisted.internet.interfaces import IReactorTCP
-from twisted.python.filepath import FilePath
-from twisted.python.url import URL
-from twisted.web.resource import IResource
-
-logger = logging.getLogger(__name__)
-
-
-def create_issuing_service(
- reactor: IReactorTCP,
- acme_url: str,
- account_key_file: str,
- well_known_resource: IResource,
-) -> AcmeIssuingService:
- """Create an ACME issuing service, and attach it to a web Resource
-
- Args:
- reactor: twisted reactor
- acme_url: URL to use to request certificates
- account_key_file: where to store the account key
- well_known_resource: web resource for .well-known.
- we will attach a child resource for "acme-challenge".
-
- Returns:
- AcmeIssuingService
- """
- responder = HTTP01Responder()
-
- well_known_resource.putChild(b"acme-challenge", responder.resource)
-
- store = ErsatzStore()
-
- return AcmeIssuingService(
- cert_store=store,
- client_creator=(
- lambda: Client.from_url(
- reactor=reactor,
- url=URL.from_text(acme_url),
- key=load_or_create_client_key(account_key_file),
- alg=RS256,
- )
- ),
- clock=reactor,
- responders=[responder],
- )
-
-
-@attr.s(slots=True)
-@implementer(ICertificateStore)
-class ErsatzStore:
- """
- A store that only stores in memory.
- """
-
- certs = attr.ib(type=Dict[bytes, List[bytes]], default=attr.Factory(dict))
-
- def store(
- self, server_name: bytes, pem_objects: Iterable[pem.AbstractPEMObject]
- ) -> defer.Deferred:
- self.certs[server_name] = [o.as_bytes() for o in pem_objects]
- return defer.succeed(None)
-
-
-def load_or_create_client_key(key_file: str) -> JWKRSA:
- """Load the ACME account key from a file, creating it if it does not exist.
-
- Args:
- key_file: name of the file to use as the account key
- """
- # this is based on txacme.endpoint.load_or_create_client_key, but doesn't
- # hardcode the 'client.key' filename
- acme_key_file = FilePath(key_file)
- if acme_key_file.exists():
- logger.info("Loading ACME account key from '%s'", acme_key_file)
- key = serialization.load_pem_private_key(
- acme_key_file.getContent(), password=None, backend=default_backend()
- )
- else:
- logger.info("Saving new ACME account key to '%s'", acme_key_file)
- key = generate_private_key("rsa")
- acme_key_file.setContent(
- key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- )
- )
- return JWKRSA(key=key)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 8a6666a4ad..1971e373ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -302,6 +302,7 @@ class AuthHandler(BaseHandler):
request: SynapseRequest,
request_body: Dict[str, Any],
description: str,
+ can_skip_ui_auth: bool = False,
) -> Tuple[dict, Optional[str]]:
"""
Checks that the user is who they claim to be, via a UI auth.
@@ -320,6 +321,10 @@ class AuthHandler(BaseHandler):
description: A human readable string to be displayed to the user that
describes the operation happening on their account.
+ can_skip_ui_auth: True if the UI auth session timeout applies this
+ action. Should be set to False for any "dangerous"
+ actions (e.g. deactivating an account).
+
Returns:
A tuple of (params, session_id).
@@ -343,7 +348,7 @@ class AuthHandler(BaseHandler):
"""
if not requester.access_token_id:
raise ValueError("Cannot validate a user without an access token")
- if self._ui_auth_session_timeout:
+ if can_skip_ui_auth and self._ui_auth_session_timeout:
last_validated = await self.store.get_access_token_last_validated(
requester.access_token_id
)
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index a0df16a32f..989996b628 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -13,7 +13,12 @@
# limitations under the License.
from typing import TYPE_CHECKING, Collection, Optional
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import (
+ EventTypes,
+ JoinRules,
+ Membership,
+ RestrictedJoinRuleTypes,
+)
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
@@ -42,7 +47,7 @@ class EventAuthHandler:
Check whether a user can join a room without an invite due to restricted join rules.
When joining a room with restricted joined rules (as defined in MSC3083),
- the membership of spaces must be checked during a room join.
+ the membership of rooms must be checked during a room join.
Args:
state_ids: The state of the room as it currently is.
@@ -67,20 +72,20 @@ class EventAuthHandler:
if not await self.has_restricted_join_rules(state_ids, room_version):
return
- # Get the spaces which allow access to this room and check if the user is
+ # Get the rooms which allow access to this room and check if the user is
# in any of them.
- allowed_spaces = await self.get_spaces_that_allow_join(state_ids)
- if not await self.is_user_in_rooms(allowed_spaces, user_id):
+ allowed_rooms = await self.get_rooms_that_allow_join(state_ids)
+ if not await self.is_user_in_rooms(allowed_rooms, user_id):
raise AuthError(
403,
- "You do not belong to any of the required spaces to join this room.",
+ "You do not belong to any of the required rooms to join this room.",
)
async def has_restricted_join_rules(
self, state_ids: StateMap[str], room_version: RoomVersion
) -> bool:
"""
- Return if the room has the proper join rules set for access via spaces.
+ Return if the room has the proper join rules set for access via rooms.
Args:
state_ids: The state of the room as it currently is.
@@ -102,17 +107,17 @@ class EventAuthHandler:
join_rules_event = await self._store.get_event(join_rules_event_id)
return join_rules_event.content.get("join_rule") == JoinRules.MSC3083_RESTRICTED
- async def get_spaces_that_allow_join(
+ async def get_rooms_that_allow_join(
self, state_ids: StateMap[str]
) -> Collection[str]:
"""
- Generate a list of spaces which allow access to a room.
+ Generate a list of rooms in which membership allows access to a room.
Args:
- state_ids: The state of the room as it currently is.
+ state_ids: The current state of the room the user wishes to join
Returns:
- A collection of spaces which provide membership to the room.
+ A collection of room IDs. Membership in any of the rooms in the list grants the ability to join the target room.
"""
# If there's no join rule, then it defaults to invite (so this doesn't apply).
join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
@@ -123,21 +128,25 @@ class EventAuthHandler:
join_rules_event = await self._store.get_event(join_rules_event_id)
# If allowed is of the wrong form, then only allow invited users.
- allowed_spaces = join_rules_event.content.get("allow", [])
- if not isinstance(allowed_spaces, list):
+ allow_list = join_rules_event.content.get("allow", [])
+ if not isinstance(allow_list, list):
return ()
# Pull out the other room IDs, invalid data gets filtered.
result = []
- for space in allowed_spaces:
- if not isinstance(space, dict):
+ for allow in allow_list:
+ if not isinstance(allow, dict):
+ continue
+
+ # If the type is unexpected, skip it.
+ if allow.get("type") != RestrictedJoinRuleTypes.ROOM_MEMBERSHIP:
continue
- space_id = space.get("space")
- if not isinstance(space_id, str):
+ room_id = allow.get("room_id")
+ if not isinstance(room_id, str):
continue
- result.append(space_id)
+ result.append(room_id)
return result
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b3a93212f1..1b566dbf2d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1961,7 +1961,7 @@ class FederationHandler(BaseHandler):
return event
async def on_send_leave_request(self, origin: str, pdu: EventBase) -> None:
- """ We have received a leave event for a room. Fully process it."""
+ """We have received a leave event for a room. Fully process it."""
event = pdu
logger.debug(
@@ -2086,8 +2086,6 @@ class FederationHandler(BaseHandler):
context = await self.state_handler.compute_event_context(event)
- await self._auth_and_persist_event(origin, event, context)
-
event_allowed = await self.third_party_event_rules.check_event_allowed(
event, context
)
@@ -2097,6 +2095,8 @@ class FederationHandler(BaseHandler):
403, "This event is not allowed in this context", Codes.FORBIDDEN
)
+ await self._auth_and_persist_event(origin, event, context)
+
return context
async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
@@ -2423,7 +2423,11 @@ class FederationHandler(BaseHandler):
)
async def _check_for_soft_fail(
- self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool
+ self,
+ event: EventBase,
+ state: Optional[Iterable[EventBase]],
+ backfilled: bool,
+ origin: str,
) -> None:
"""Checks if we should soft fail the event; if so, marks the event as
such.
@@ -2432,6 +2436,7 @@ class FederationHandler(BaseHandler):
event
state: The state at the event if we don't have all the event's prev events
backfilled: Whether the event is from backfill
+ origin: The host the event originates from.
"""
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
@@ -2501,7 +2506,17 @@ class FederationHandler(BaseHandler):
try:
event_auth.check(room_version_obj, event, auth_events=current_auth_events)
except AuthError as e:
- logger.warning("Soft-failing %r because %s", event, e)
+ logger.warning(
+ "Soft-failing %r (from %s) because %s",
+ event,
+ e,
+ origin,
+ extra={
+ "room_id": event.room_id,
+ "mxid": event.sender,
+ "hs": origin,
+ },
+ )
soft_failed_event_counter.inc()
event.internal_metadata.soft_failed = True
@@ -2614,7 +2629,7 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
if not context.rejected:
- await self._check_for_soft_fail(event, state, backfilled)
+ await self._check_for_soft_fail(event, state, backfilled, origin=origin)
if event.type == EventTypes.GuestAccess and not context.rejected:
await self.maybe_kick_guest_users(event)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a8ba25bb8c..2c1b10f652 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -482,6 +482,9 @@ class EventCreationHandler:
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
+ outlier: bool = False,
+ historical: bool = False,
+ depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Given a dict from a client, create a new event.
@@ -508,6 +511,14 @@ class EventCreationHandler:
require_consent: Whether to check if the requester has
consented to the privacy policy.
+
+ outlier: Indicates whether the event is an `outlier`, i.e. if
+ it's from an arbitrary point and floating in the DAG as
+ opposed to being inline with the current DAG.
+ depth: Override the depth used to order the event in the DAG.
+ Should normally be set to None, which will cause the depth to be calculated
+ based on the prev_events.
+
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
@@ -563,11 +574,36 @@ class EventCreationHandler:
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
+ builder.internal_metadata.outlier = outlier
+
+ builder.internal_metadata.historical = historical
+
+ # Strip down the auth_event_ids to only what we need to auth the event.
+ # For example, we don't need extra m.room.member that don't match event.sender
+ if auth_event_ids is not None:
+ temp_event = await builder.build(
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ depth=depth,
+ )
+ auth_events = await self.store.get_events_as_list(auth_event_ids)
+ # Create a StateMap[str]
+ auth_event_state_map = {
+ (e.type, e.state_key): e.event_id for e in auth_events
+ }
+ # Actually strip down and use the necessary auth events
+ auth_event_ids = self.auth.compute_auth_events(
+ event=temp_event,
+ current_state_ids=auth_event_state_map,
+ for_verification=False,
+ )
+
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ depth=depth,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -724,9 +760,13 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
+ prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
+ outlier: bool = False,
+ depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
@@ -736,10 +776,24 @@ class EventCreationHandler:
Args:
requester: The requester sending the event.
event_dict: An entire event.
+ prev_event_ids:
+ The event IDs to use as the prev events.
+ Should normally be left as None to automatically request them
+ from the database.
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
+ outlier: Indicates whether the event is an `outlier`, i.e. if
+ it's from an arbitrary point and floating in the DAG as
+ opposed to being inline with the current DAG.
+ depth: Override the depth used to order the event in the DAG.
+ Should normally be set to None, which will cause the depth to be calculated
+ based on the prev_events.
Returns:
The event, and its stream ordering (if deduplication happened,
@@ -779,7 +833,13 @@ class EventCreationHandler:
return event, event.internal_metadata.stream_ordering
event, context = await self.create_event(
- requester, event_dict, txn_id=txn_id
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ outlier=outlier,
+ depth=depth,
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -811,6 +871,7 @@ class EventCreationHandler:
requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -828,6 +889,10 @@ class EventCreationHandler:
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ depth: Override the depth used to order the event in the DAG.
+ Should normally be set to None, which will cause the depth to be calculated
+ based on the prev_events.
+
Returns:
Tuple of created event, context
"""
@@ -851,9 +916,24 @@ class EventCreationHandler:
), "Attempting to create an event with no prev_events"
event = await builder.build(
- prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ depth=depth,
)
- context = await self.state.compute_event_context(event)
+
+ old_state = None
+
+ # Pass on the outlier property from the builder to the event
+ # after it is created
+ if builder.internal_metadata.outlier:
+ event.internal_metadata.outlier = builder.internal_metadata.outlier
+
+ # Calculate the state for outliers that pass in their own `auth_event_ids`
+ if auth_event_ids:
+ old_state = await self.store.get_events_as_list(auth_event_ids)
+
+ context = await self.state.compute_event_context(event, old_state=old_state)
+
if requester:
context.app_service = requester.app_service
@@ -1018,7 +1098,13 @@ class EventCreationHandler:
the arguments.
"""
- await self.action_generator.handle_push_actions_for_event(event, context)
+ # Skip push notification actions for historical messages
+ # because we don't want to notify people about old history back in time.
+ # The historical messages also do not have the proper `context.current_state_ids`
+ # and `state_groups` because they have `prev_events` that aren't persisted yet
+ # (historical messages persisted in reverse-chronological order).
+ if not event.internal_metadata.is_historical():
+ await self.action_generator.handle_push_actions_for_event(event, context)
try:
# If we're a worker we need to hit out to the master.
@@ -1317,13 +1403,21 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
+ # Mark any `m.historical` messages as backfilled so they don't appear
+ # in `/sync` and have the proper decrementing `stream_ordering` as we import
+ backfilled = False
+ if event.internal_metadata.is_historical():
+ backfilled = True
+
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
- ) = await self.storage.persistence.persist_event(event, context=context)
+ ) = await self.storage.persistence.persist_event(
+ event, context=context, backfilled=backfilled
+ )
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4ceef3fab3..ca1ed6a5c0 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -195,7 +195,7 @@ class RegistrationHandler(BaseHandler):
bind_emails: list of emails to bind to this account.
by_admin: True if this registration is being made via the
admin api, otherwise False.
- user_agent_ips: Tuples of IP addresses and user-agents used
+ user_agent_ips: Tuples of user-agents and IP addresses used
during the registration process.
auth_provider_id: The SSO IdP the user used, if any.
Returns:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 04da413811..3bff2fc489 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -258,11 +258,42 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id: str,
membership: str,
prev_event_ids: List[str],
+ auth_event_ids: Optional[List[str]] = None,
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
+ outlier: bool = False,
) -> Tuple[str, int]:
+ """
+ Internal membership update function to get an existing event or create
+ and persist a new event for the new membership change.
+
+ Args:
+ requester:
+ target:
+ room_id:
+ membership:
+ prev_event_ids: The event IDs to use as the prev events
+
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
+ txn_id:
+ ratelimit:
+ content:
+ require_consent:
+
+ outlier: Indicates whether the event is an `outlier`, i.e. if
+ it's from an arbitrary point and floating in the DAG as
+ opposed to being inline with the current DAG.
+
+ Returns:
+ Tuple of event ID and stream ordering position
+ """
+
user_id = target.to_string()
if content is None:
@@ -299,7 +330,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
},
txn_id=txn_id,
prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
require_consent=require_consent,
+ outlier=outlier,
)
prev_state_ids = await context.get_prev_state_ids()
@@ -400,6 +433,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
+ outlier: bool = False,
+ prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
"""Update a user's membership in a room.
@@ -414,6 +450,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit the request.
content: The content of the created event.
require_consent: Whether consent is required.
+ outlier: Indicates whether the event is an `outlier`, i.e. if
+ it's from an arbitrary point and floating in the DAG as
+ opposed to being inline with the current DAG.
+ prev_event_ids: The event IDs to use as the prev events
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
Returns:
A tuple of the new event ID and stream ID.
@@ -459,6 +503,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit=ratelimit,
content=content,
require_consent=require_consent,
+ outlier=outlier,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
return result
@@ -475,10 +522,36 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
+ outlier: bool = False,
+ prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
"""Helper for update_membership.
Assumes that the membership linearizer is already held for the room.
+
+ Args:
+ requester:
+ target:
+ room_id:
+ action:
+ txn_id:
+ remote_room_hosts:
+ third_party_signed:
+ ratelimit:
+ content:
+ require_consent:
+ outlier: Indicates whether the event is an `outlier`, i.e. if
+ it's from an arbitrary point and floating in the DAG as
+ opposed to being inline with the current DAG.
+ prev_event_ids: The event IDs to use as the prev events
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
+ Returns:
+ A tuple of the new event ID and stream ID.
"""
content_specified = bool(content)
if content is None:
@@ -563,6 +636,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
+ if prev_event_ids:
+ return await self._local_membership_update(
+ requester=requester,
+ target=target,
+ room_id=room_id,
+ membership=effective_membership_state,
+ txn_id=txn_id,
+ ratelimit=ratelimit,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ content=content,
+ require_consent=require_consent,
+ outlier=outlier,
+ )
+
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
current_state_ids = await self.state_handler.get_current_state_ids(
@@ -752,8 +840,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
+ auth_event_ids=auth_event_ids,
content=content,
require_consent=require_consent,
+ outlier=outlier,
)
async def transfer_room_state_on_room_upgrade(
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 73d2aab15c..17fc47ce16 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -160,14 +160,14 @@ class SpaceSummaryHandler:
# Check if the user is a member of any of the allowed spaces
# from the response.
- allowed_spaces = room.get("allowed_spaces")
+ allowed_rooms = room.get("allowed_spaces")
if (
not include_room
- and allowed_spaces
- and isinstance(allowed_spaces, list)
+ and allowed_rooms
+ and isinstance(allowed_rooms, list)
):
include_room = await self._event_auth_handler.is_user_in_rooms(
- allowed_spaces, requester
+ allowed_rooms, requester
)
# Finally, if this isn't the requested room, check ourselves
@@ -445,21 +445,20 @@ class SpaceSummaryHandler:
member_event_id = state_ids.get((EventTypes.Member, requester), None)
# If they're in the room they can see info on it.
- member_event = None
if member_event_id:
member_event = await self._store.get_event(member_event_id)
if member_event.membership in (Membership.JOIN, Membership.INVITE):
return True
# Otherwise, check if they should be allowed access via membership in a space.
- if self._event_auth_handler.has_restricted_join_rules(
+ if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
- allowed_spaces = (
- await self._event_auth_handler.get_spaces_that_allow_join(state_ids)
+ allowed_rooms = (
+ await self._event_auth_handler.get_rooms_that_allow_join(state_ids)
)
if await self._event_auth_handler.is_user_in_rooms(
- allowed_spaces, requester
+ allowed_rooms, requester
):
return True
@@ -475,10 +474,10 @@ class SpaceSummaryHandler:
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
- allowed_spaces = (
- await self._event_auth_handler.get_spaces_that_allow_join(state_ids)
+ allowed_rooms = (
+ await self._event_auth_handler.get_rooms_that_allow_join(state_ids)
)
- for space_id in allowed_spaces:
+ for space_id in allowed_rooms:
if await self._auth.check_host_in_room(space_id, origin):
return True
@@ -512,11 +511,11 @@ class SpaceSummaryHandler:
)
room_version = await self._store.get_room_version(room_id)
- allowed_spaces = None
+ allowed_rooms = None
if await self._event_auth_handler.has_restricted_join_rules(
current_state_ids, room_version
):
- allowed_spaces = await self._event_auth_handler.get_spaces_that_allow_join(
+ allowed_rooms = await self._event_auth_handler.get_rooms_that_allow_join(
current_state_ids
)
@@ -533,7 +532,7 @@ class SpaceSummaryHandler:
"guest_can_join": stats["guest_access"] == "can_join",
"creation_ts": create_event.origin_server_ts,
"room_type": create_event.content.get(EventContentFields.ROOM_TYPE),
- "allowed_spaces": allowed_spaces,
+ "allowed_spaces": allowed_rooms,
}
# Filter out Nones – rather omit the field altogether
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 044ff06d84..0b297e54c4 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -41,7 +41,12 @@ from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http import get_request_user_agent
from synapse.http.server import respond_with_html, respond_with_redirect
from synapse.http.site import SynapseRequest
-from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters
+from synapse.types import (
+ JsonDict,
+ UserID,
+ contains_invalid_mxid_characters,
+ create_requester,
+)
from synapse.util.async_helpers import Linearizer
from synapse.util.stringutils import random_string
@@ -185,11 +190,14 @@ class SsoHandler:
self._auth_handler = hs.get_auth_handler()
self._error_template = hs.config.sso_error_template
self._bad_user_template = hs.config.sso_auth_bad_user_template
+ self._profile_handler = hs.get_profile_handler()
# The following template is shown after a successful user interactive
# authentication session. It tells the user they can close the window.
self._sso_auth_success_template = hs.config.sso_auth_success_template
+ self._sso_update_profile_information = hs.config.sso_update_profile_information
+
# a lock on the mappings
self._mapping_lock = Linearizer(name="sso_user_mapping", clock=hs.get_clock())
@@ -458,6 +466,21 @@ class SsoHandler:
request.getClientIP(),
)
new_user = True
+ elif self._sso_update_profile_information:
+ attributes = await self._call_attribute_mapper(sso_to_matrix_id_mapper)
+ if attributes.display_name:
+ user_id_obj = UserID.from_string(user_id)
+ profile_display_name = await self._profile_handler.get_displayname(
+ user_id_obj
+ )
+ if profile_display_name != attributes.display_name:
+ requester = create_requester(
+ user_id,
+ authenticated_entity=user_id,
+ )
+ await self._profile_handler.set_displayname(
+ user_id_obj, requester, attributes.display_name, True
+ )
await self._auth_handler.complete_sso_login(
user_id,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 64f74d4c94..151484e21e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -49,7 +49,7 @@ from synapse.types import (
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client
@@ -84,12 +84,15 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+SyncRequestKey = Tuple[Any, ...]
+
+
@attr.s(slots=True, frozen=True)
class SyncConfig:
user = attr.ib(type=UserID)
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
- request_key = attr.ib(type=Tuple[Any, ...])
+ request_key = attr.ib(type=SyncRequestKey)
device_id = attr.ib(type=Optional[str])
@@ -267,9 +270,9 @@ class SyncHandler:
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(
+ self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
hs.get_clock(), "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
- ) # type: ResponseCache[Tuple[Any, ...]]
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
@@ -308,6 +311,7 @@ class SyncHandler:
since_token,
timeout,
full_state,
+ cache_context=True,
)
logger.debug("Returning sync response for %s", user_id)
return res
@@ -315,9 +319,10 @@ class SyncHandler:
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
- since_token: Optional[StreamToken] = None,
- timeout: int = 0,
- full_state: bool = False,
+ since_token: Optional[StreamToken],
+ timeout: int,
+ full_state: bool,
+ cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
if since_token is None:
sync_type = "initial_sync"
@@ -344,13 +349,13 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result = await self.current_sync_for_user(
+ result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:
- def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, since_token)
+ async def current_sync_callback(before_token, after_token) -> SyncResult:
+ return await self.current_sync_for_user(sync_config, since_token)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
@@ -359,6 +364,17 @@ class SyncHandler:
from_token=since_token,
)
+ # if nothing has happened in any of the users' rooms since /sync was called,
+ # the resultant next_batch will be the same as since_token (since the result
+ # is generated when wait_for_events is first called, and not regenerated
+ # when wait_for_events times out).
+ #
+ # If that happens, we mustn't cache it, so that when the client comes back
+ # with the same cache token, we don't immediately return the same empty
+ # result, causing a tightloop. (#8518)
+ if result.next_batch == since_token:
+ cache_context.should_cache = False
+
if result:
if sync_config.filter_collection.lazy_load_members():
lazy_loaded = "true"
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 3c43f32586..fda8da21b7 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -454,7 +454,7 @@ class RestServlet:
"""
def register(self, http_server):
- """ Register this servlet with the given HTTP server. """
+ """Register this servlet with the given HTTP server."""
patterns = getattr(self, "PATTERNS", None)
if patterns:
for method in ("GET", "PUT", "POST", "DELETE"):
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 8002a250a2..6e82f7c7f1 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -20,8 +20,9 @@ import logging
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
-# The properties of a standard LogRecord.
-_LOG_RECORD_ATTRIBUTES = {
+# The properties of a standard LogRecord that should be ignored when generating
+# JSON logs.
+_IGNORED_LOG_RECORD_ATTRIBUTES = {
"args",
"asctime",
"created",
@@ -59,9 +60,9 @@ class JsonFormatter(logging.Formatter):
return self._format(record, event)
def _format(self, record: logging.LogRecord, event: dict) -> str:
- # Add any extra attributes to the event.
+ # Add attributes specified via the extra keyword to the logged event.
for key, value in record.__dict__.items():
- if key not in _LOG_RECORD_ATTRIBUTES:
+ if key not in _IGNORED_LOG_RECORD_ATTRIBUTES:
event[key] = value
return _encoder.encode(event)
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 4f18792c99..140ed711e3 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -173,6 +173,7 @@ from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Typ
import attr
from twisted.internet import defer
+from twisted.web.http_headers import Headers
from synapse.config import ConfigError
from synapse.util import json_decoder, json_encoder
@@ -668,6 +669,25 @@ def inject_header_dict(
headers[key.encode()] = [value.encode()]
+def inject_response_headers(response_headers: Headers) -> None:
+ """Inject the current trace id into the HTTP response headers"""
+ if not opentracing:
+ return
+ span = opentracing.tracer.active_span
+ if not span:
+ return
+
+ # This is a bit implementation-specific.
+ #
+ # Jaeger's Spans have a trace_id property; other implementations (including the
+ # dummy opentracing.span.Span which we use if init_tracer is not called) do not
+ # expose it
+ trace_id = getattr(span, "trace_id", None)
+
+ if trace_id is not None:
+ response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
+
+
@ensure_active_span("get the active span context as a dict", ret={})
def get_active_span_text_map(destination=None):
"""
@@ -843,6 +863,7 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
scope = start_active_span(request_name)
with scope:
+ inject_response_headers(request.responseHeaders)
try:
yield
finally:
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index cecdc96bf5..58b255eb1b 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -16,6 +16,7 @@ import logging
from typing import TYPE_CHECKING, Any, Generator, Iterable, List, Optional, Tuple
from twisted.internet import defer
+from twisted.web.resource import IResource
from synapse.events import EventBase
from synapse.http.client import SimpleHttpClient
@@ -42,7 +43,7 @@ class ModuleApi:
can register new users etc if necessary.
"""
- def __init__(self, hs, auth_handler):
+ def __init__(self, hs: "HomeServer", auth_handler):
self._hs = hs
self._store = hs.get_datastore()
@@ -56,6 +57,33 @@ class ModuleApi:
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
+ self._spam_checker = hs.get_spam_checker()
+
+ #################################################################################
+ # The following methods should only be called during the module's initialisation.
+
+ @property
+ def register_spam_checker_callbacks(self):
+ """Registers callbacks for spam checking capabilities."""
+ return self._spam_checker.register_callbacks
+
+ def register_web_resource(self, path: str, resource: IResource):
+ """Registers a web resource to be served at the given path.
+
+ This function should be called during initialisation of the module.
+
+ If multiple modules register a resource for the same path, the module that
+ appears the highest in the configuration file takes priority.
+
+ Args:
+ path: The path to register the resource for.
+ resource: The resource to attach to this path.
+ """
+ self._hs.register_module_web_resource(path, resource)
+
+ #########################################################################
+ # The following methods can be called by the module at any point in time.
+
@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
diff --git a/synapse/module_api/errors.py b/synapse/module_api/errors.py
index d24864c549..02bbb0be39 100644
--- a/synapse/module_api/errors.py
+++ b/synapse/module_api/errors.py
@@ -15,3 +15,4 @@
"""Exception types which are exposed as part of the stable module API"""
from synapse.api.errors import RedirectException, SynapseError # noqa: F401
+from synapse.config._base import ConfigError # noqa: F401
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 546231bec0..271c17c226 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -75,11 +75,9 @@ REQUIREMENTS = [
"phonenumbers>=8.2.0",
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
"prometheus_client>=0.4.0",
- # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
- # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
- # is out in November.)
+ # we use `order`, which arrived in attrs 19.2.0.
# Note: 21.1.0 broke `/sync`, see #9936
- "attrs>=19.1.0,!=21.1.0",
+ "attrs>=19.2.0,!=21.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
@@ -98,11 +96,6 @@ CONDITIONAL_REQUIREMENTS = {
"psycopg2cffi>=2.8 ; platform_python_implementation == 'PyPy'",
"psycopg2cffi-compat==1.1 ; platform_python_implementation == 'PyPy'",
],
- # ACME support is required to provision TLS certificates from authorities
- # that use the protocol, such as Let's Encrypt.
- "acme": [
- "txacme>=0.9.2",
- ],
"saml2": [
"pysaml2>=4.5.0",
],
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 7ced4c543c..2ad7a200bb 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -571,7 +571,7 @@ class ReplicationCommandHandler:
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
):
- """"Called when get a new REMOTE_SERVER_UP command."""
+ """Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
self._notifier.notify_remote_server_up(cmd.data)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 16d087ea60..92ebe838fd 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -19,7 +19,7 @@ import re
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from urllib import parse as urlparse
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -266,6 +266,288 @@ class RoomSendEventRestServlet(TransactionRestServlet):
)
+class RoomBatchSendEventRestServlet(TransactionRestServlet):
+ """
+ API endpoint which can insert a chunk of events historically back in time
+ next to the given `prev_event`.
+
+ `chunk_id` comes from `next_chunk_id `in the response of the batch send
+ endpoint and is derived from the "insertion" events added to each chunk.
+ It's not required for the first batch send.
+
+ `state_events_at_start` is used to define the historical state events
+ needed to auth the events like join events. These events will float
+ outside of the normal DAG as outlier's and won't be visible in the chat
+ history which also allows us to insert multiple chunks without having a bunch
+ of `@mxid joined the room` noise between each chunk.
+
+ `events` is chronological chunk/list of events you want to insert.
+ There is a reverse-chronological constraint on chunks so once you insert
+ some messages, you can only insert older ones after that.
+ tldr; Insert chunks from your most recent history -> oldest history.
+
+ POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
+ {
+ "events": [ ... ],
+ "state_events_at_start": [ ... ]
+ }
+ """
+
+ PATTERNS = (
+ re.compile(
+ "^/_matrix/client/unstable/org.matrix.msc2716"
+ "/rooms/(?P<room_id>[^/]*)/batch_send$"
+ ),
+ )
+
+ def __init__(self, hs):
+ super().__init__(hs)
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.state_store = hs.get_storage().state
+ self.event_creation_handler = hs.get_event_creation_handler()
+ self.room_member_handler = hs.get_room_member_handler()
+ self.auth = hs.get_auth()
+
+ async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int:
+ (
+ most_recent_prev_event_id,
+ most_recent_prev_event_depth,
+ ) = await self.store.get_max_depth_of(prev_event_ids)
+
+ # We want to insert the historical event after the `prev_event` but before the successor event
+ #
+ # We inherit depth from the successor event instead of the `prev_event`
+ # because events returned from `/messages` are first sorted by `topological_ordering`
+ # which is just the `depth` and then tie-break with `stream_ordering`.
+ #
+ # We mark these inserted historical events as "backfilled" which gives them a
+ # negative `stream_ordering`. If we use the same depth as the `prev_event`,
+ # then our historical event will tie-break and be sorted before the `prev_event`
+ # when it should come after.
+ #
+ # We want to use the successor event depth so they appear after `prev_event` because
+ # it has a larger `depth` but before the successor event because the `stream_ordering`
+ # is negative before the successor event.
+ successor_event_ids = await self.store.get_successor_events(
+ [most_recent_prev_event_id]
+ )
+
+ # If we can't find any successor events, then it's a forward extremity of
+ # historical messages and we can just inherit from the previous historical
+ # event which we can already assume has the correct depth where we want
+ # to insert into.
+ if not successor_event_ids:
+ depth = most_recent_prev_event_depth
+ else:
+ (
+ _,
+ oldest_successor_depth,
+ ) = await self.store.get_min_depth_of(successor_event_ids)
+
+ depth = oldest_successor_depth
+
+ return depth
+
+ async def on_POST(self, request, room_id):
+ requester = await self.auth.get_user_by_req(request, allow_guest=False)
+
+ if not requester.app_service:
+ raise AuthError(
+ 403,
+ "Only application services can use the /batchsend endpoint",
+ )
+
+ body = parse_json_object_from_request(request)
+ assert_params_in_dict(body, ["state_events_at_start", "events"])
+
+ prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
+ chunk_id_from_query = parse_string(request, "chunk_id", default=None)
+
+ if prev_events_from_query is None:
+ raise SynapseError(
+ 400,
+ "prev_event query parameter is required when inserting historical messages back in time",
+ errcode=Codes.MISSING_PARAM,
+ )
+
+ # For the event we are inserting next to (`prev_events_from_query`),
+ # find the most recent auth events (derived from state events) that
+ # allowed that message to be sent. We will use that as a base
+ # to auth our historical messages against.
+ (
+ most_recent_prev_event_id,
+ _,
+ ) = await self.store.get_max_depth_of(prev_events_from_query)
+ # mapping from (type, state_key) -> state_event_id
+ prev_state_map = await self.state_store.get_state_ids_for_event(
+ most_recent_prev_event_id
+ )
+ # List of state event ID's
+ prev_state_ids = list(prev_state_map.values())
+ auth_event_ids = prev_state_ids
+
+ for state_event in body["state_events_at_start"]:
+ assert_params_in_dict(
+ state_event, ["type", "origin_server_ts", "content", "sender"]
+ )
+
+ logger.debug(
+ "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
+ state_event,
+ auth_event_ids,
+ )
+
+ event_dict = {
+ "type": state_event["type"],
+ "origin_server_ts": state_event["origin_server_ts"],
+ "content": state_event["content"],
+ "room_id": room_id,
+ "sender": state_event["sender"],
+ "state_key": state_event["state_key"],
+ }
+
+ # Make the state events float off on their own
+ fake_prev_event_id = "$" + random_string(43)
+
+ # TODO: This is pretty much the same as some other code to handle inserting state in this file
+ if event_dict["type"] == EventTypes.Member:
+ membership = event_dict["content"].get("membership", None)
+ event_id, _ = await self.room_member_handler.update_membership(
+ requester,
+ target=UserID.from_string(event_dict["state_key"]),
+ room_id=room_id,
+ action=membership,
+ content=event_dict["content"],
+ outlier=True,
+ prev_event_ids=[fake_prev_event_id],
+ # Make sure to use a copy of this list because we modify it
+ # later in the loop here. Otherwise it will be the same
+ # reference and also update in the event when we append later.
+ auth_event_ids=auth_event_ids.copy(),
+ )
+ else:
+ # TODO: Add some complement tests that adds state that is not member joins
+ # and will use this code path. Maybe we only want to support join state events
+ # and can get rid of this `else`?
+ (
+ event,
+ _,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ event_dict,
+ outlier=True,
+ prev_event_ids=[fake_prev_event_id],
+ # Make sure to use a copy of this list because we modify it
+ # later in the loop here. Otherwise it will be the same
+ # reference and also update in the event when we append later.
+ auth_event_ids=auth_event_ids.copy(),
+ )
+ event_id = event.event_id
+
+ auth_event_ids.append(event_id)
+
+ events_to_create = body["events"]
+
+ # If provided, connect the chunk to the last insertion point
+ # The chunk ID passed in comes from the chunk_id in the
+ # "insertion" event from the previous chunk.
+ if chunk_id_from_query:
+ last_event_in_chunk = events_to_create[-1]
+ last_event_in_chunk["content"][
+ EventContentFields.MSC2716_CHUNK_ID
+ ] = chunk_id_from_query
+
+ # Add an "insertion" event to the start of each chunk (next to the oldest
+ # event in the chunk) so the next chunk can be connected to this one.
+ next_chunk_id = random_string(64)
+ insertion_event = {
+ "type": EventTypes.MSC2716_INSERTION,
+ "sender": requester.user.to_string(),
+ "content": {
+ EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
+ EventContentFields.MSC2716_HISTORICAL: True,
+ },
+ # Since the insertion event is put at the start of the chunk,
+ # where the oldest event is, copy the origin_server_ts from
+ # the first event we're inserting
+ "origin_server_ts": events_to_create[0]["origin_server_ts"],
+ }
+ # Prepend the insertion event to the start of the chunk
+ events_to_create = [insertion_event] + events_to_create
+
+ inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
+
+ event_ids = []
+ prev_event_ids = prev_events_from_query
+ events_to_persist = []
+ for ev in events_to_create:
+ assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
+
+ # Mark all events as historical
+ # This has important semantics within the Synapse internals to backfill properly
+ ev["content"][EventContentFields.MSC2716_HISTORICAL] = True
+
+ event_dict = {
+ "type": ev["type"],
+ "origin_server_ts": ev["origin_server_ts"],
+ "content": ev["content"],
+ "room_id": room_id,
+ "sender": ev["sender"], # requester.user.to_string(),
+ "prev_events": prev_event_ids.copy(),
+ }
+
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ prev_event_ids=event_dict.get("prev_events"),
+ auth_event_ids=auth_event_ids,
+ historical=True,
+ depth=inherited_depth,
+ )
+ logger.debug(
+ "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
+ event,
+ prev_event_ids,
+ auth_event_ids,
+ )
+
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
+
+ events_to_persist.append((event, context))
+ event_id = event.event_id
+
+ event_ids.append(event_id)
+ prev_event_ids = [event_id]
+
+ # Persist events in reverse-chronological order so they have the
+ # correct stream_ordering as they are backfilled (which decrements).
+ # Events are sorted by (topological_ordering, stream_ordering)
+ # where topological_ordering is just depth.
+ for (event, context) in reversed(events_to_persist):
+ ev = await self.event_creation_handler.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
+ )
+
+ return 200, {
+ "state_events": auth_event_ids,
+ "events": event_ids,
+ "next_chunk_id": next_chunk_id,
+ }
+
+ def on_GET(self, request, room_id):
+ return 501, "Not implemented"
+
+ def on_PUT(self, request, room_id):
+ return self.txns.fetch_or_execute_request(
+ request, self.on_POST, request, room_id
+ )
+
+
# TODO: Needs unit testing for room ID + alias joins
class JoinRoomAliasServlet(TransactionRestServlet):
def __init__(self, hs):
@@ -1054,6 +1336,8 @@ class RoomSpaceSummaryRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server, is_worker=False):
+ msc2716_enabled = hs.config.experimental.msc2716_enabled
+
RoomStateEventRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1061,6 +1345,8 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
JoinRoomAliasServlet(hs).register(http_server)
RoomMembershipRestServlet(hs).register(http_server)
RoomSendEventRestServlet(hs).register(http_server)
+ if msc2716_enabled:
+ RoomBatchSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index 9af05f9b11..8b9674db06 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -86,6 +86,9 @@ class DeleteDevicesRestServlet(RestServlet):
request,
body,
"remove device(s) from your account",
+ # Users might call this multiple times in a row while cleaning up
+ # devices, allow a single UI auth session to be re-used.
+ can_skip_ui_auth=True,
)
await self.device_handler.delete_devices(
@@ -135,6 +138,9 @@ class DeviceRestServlet(RestServlet):
request,
body,
"remove a device from your account",
+ # Users might call this multiple times in a row while cleaning up
+ # devices, allow a single UI auth session to be re-used.
+ can_skip_ui_auth=True,
)
await self.device_handler.delete_device(requester.user.to_string(), device_id)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 4a28f2c072..33cf8de186 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -277,6 +277,9 @@ class SigningKeyUploadServlet(RestServlet):
request,
body,
"add a device signing key to your account",
+ # Allow skipping of UI auth since this is frequently called directly
+ # after login and it is silly to ask users to re-auth immediately.
+ can_skip_ui_auth=True,
)
result = await self.e2e_keys_handler.upload_signing_keys_for_user(user_id, body)
diff --git a/synapse/server.py b/synapse/server.py
index fec0024c89..2c27d2a7e8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -1,6 +1,4 @@
-# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2017-2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -39,6 +37,7 @@ import twisted.internet.tcp
from twisted.internet import defer
from twisted.mail.smtp import sendmail
from twisted.web.iweb import IPolicyForHTTPS
+from twisted.web.resource import IResource
from synapse.api.auth import Auth
from synapse.api.filtering import Filtering
@@ -66,7 +65,6 @@ from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionR
from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
from synapse.handlers.account_data import AccountDataHandler
from synapse.handlers.account_validity import AccountValidityHandler
-from synapse.handlers.acme import AcmeHandler
from synapse.handlers.admin import AdminHandler
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
@@ -259,6 +257,38 @@ class HomeServer(metaclass=abc.ABCMeta):
self.datastores = None # type: Optional[Databases]
+ self._module_web_resources: Dict[str, IResource] = {}
+ self._module_web_resources_consumed = False
+
+ def register_module_web_resource(self, path: str, resource: IResource):
+ """Allows a module to register a web resource to be served at the given path.
+
+ If multiple modules register a resource for the same path, the module that
+ appears the highest in the configuration file takes priority.
+
+ Args:
+ path: The path to register the resource for.
+ resource: The resource to attach to this path.
+
+ Raises:
+ SynapseError(500): A module tried to register a web resource after the HTTP
+ listeners have been started.
+ """
+ if self._module_web_resources_consumed:
+ raise RuntimeError(
+ "Tried to register a web resource from a module after startup",
+ )
+
+ # Don't register a resource that's already been registered.
+ if path not in self._module_web_resources.keys():
+ self._module_web_resources[path] = resource
+ else:
+ logger.warning(
+ "Module tried to register a web resource for path %s but another module"
+ " has already registered a resource for this path.",
+ path,
+ )
+
def get_instance_id(self) -> str:
"""A unique ID for this synapse process instance.
@@ -495,10 +525,6 @@ class HomeServer(metaclass=abc.ABCMeta):
return E2eRoomKeysHandler(self)
@cache_in_self
- def get_acme_handler(self) -> AcmeHandler:
- return AcmeHandler(self)
-
- @cache_in_self
def get_admin_handler(self) -> AdminHandler:
return AdminHandler(self)
@@ -651,7 +677,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_spam_checker(self) -> SpamChecker:
- return SpamChecker(self)
+ return SpamChecker()
@cache_in_self
def get_third_party_event_rules(self) -> ThirdPartyEventRules:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 9ba5778a88..0e3dd4e9ca 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -62,6 +62,13 @@ class EndToEndKeyBackgroundStore(SQLBaseStore):
class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+ super().__init__(database, db_conn, hs)
+
+ self._allow_device_name_lookup_over_federation = (
+ self.hs.config.federation.allow_device_name_lookup_over_federation
+ )
+
async def get_e2e_device_keys_for_federation_query(
self, user_id: str
) -> Tuple[int, List[JsonDict]]:
@@ -85,7 +92,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
result["keys"] = keys
device_display_name = None
- if self.hs.config.allow_device_name_lookup_over_federation:
+ if self._allow_device_name_lookup_over_federation:
device_display_name = device.display_name
if device_display_name:
result["device_display_name"] = device_display_name
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ff81d5cd17..c0ea445550 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,7 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple
+from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -670,8 +671,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return dict(txn)
- async def get_max_depth_of(self, event_ids: List[str]) -> int:
- """Returns the max depth of a set of event IDs
+ async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+ """Returns the event ID and depth for the event that has the max depth from a set of event IDs
Args:
event_ids: The event IDs to calculate the max depth of.
@@ -680,14 +681,53 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
table="events",
column="event_id",
iterable=event_ids,
- retcols=("depth",),
+ retcols=(
+ "event_id",
+ "depth",
+ ),
desc="get_max_depth_of",
)
if not rows:
- return 0
+ return None, 0
else:
- return max(row["depth"] for row in rows)
+ max_depth_event_id = ""
+ current_max_depth = 0
+ for row in rows:
+ if row["depth"] > current_max_depth:
+ max_depth_event_id = row["event_id"]
+ current_max_depth = row["depth"]
+
+ return max_depth_event_id, current_max_depth
+
+ async def get_min_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+ """Returns the event ID and depth for the event that has the min depth from a set of event IDs
+
+ Args:
+ event_ids: The event IDs to calculate the max depth of.
+ """
+ rows = await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_min_depth_of",
+ )
+
+ if not rows:
+ return None, 0
+ else:
+ min_depth_event_id = ""
+ current_min_depth = MAX_DEPTH
+ for row in rows:
+ if row["depth"] < current_min_depth:
+ min_depth_event_id = row["event_id"]
+ current_min_depth = row["depth"]
+
+ return min_depth_event_id, current_min_depth
async def get_prev_events_for_room(self, room_id: str) -> List[str]:
"""
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 5fc3bb5a7d..2796354a1f 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -90,7 +90,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
60 * 1000,
)
self.hs.get_clock().call_later(
- 1000,
+ 1,
self._count_known_servers,
)
LaterGauge(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index dc38942bb1..051095fea9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -111,7 +111,7 @@ class _EventPersistQueueItem:
backfilled: bool
deferred: ObservableDeferred
- parent_opentracing_span_contexts: List = []
+ parent_opentracing_span_contexts: List = attr.ib(factory=list)
"""A list of opentracing spans waiting for this batch"""
opentracing_span_context: Any = None
diff --git a/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql b/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql
index 56c0ad0003..8eb2196f6a 100644
--- a/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql
+++ b/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql
@@ -13,5 +13,8 @@
* limitations under the License.
*/
-ALTER TABLE room_stats_current ADD COLUMN knocked_members INT NOT NULL DEFAULT '0';
-ALTER TABLE room_stats_historical ADD COLUMN knocked_members BIGINT NOT NULL DEFAULT '0';
\ No newline at end of file
+-- Existing rows will default to NULL, so anything reading from these tables
+-- needs to interpret NULL as 0. This is fine here as no existing rooms can have
+-- any knocked members.
+ALTER TABLE room_stats_current ADD COLUMN knocked_members INT;
+ALTER TABLE room_stats_historical ADD COLUMN knocked_members BIGINT;
diff --git a/synapse/types.py b/synapse/types.py
index e52cd7ffd4..8d2fa00f71 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -284,14 +284,14 @@ class RoomAlias(DomainSpecificString):
@attr.s(slots=True, frozen=True, repr=False)
class RoomID(DomainSpecificString):
- """Structure representing a room id. """
+ """Structure representing a room id."""
SIGIL = "!"
@attr.s(slots=True, frozen=True, repr=False)
class EventID(DomainSpecificString):
- """Structure representing an event id. """
+ """Structure representing an event id."""
SIGIL = "$"
@@ -404,7 +404,7 @@ def map_username_to_mxid_localpart(
return username.decode("ascii")
-@attr.s(frozen=True, slots=True, cmp=False)
+@attr.s(frozen=True, slots=True, order=False)
class RoomStreamToken:
"""Tokens are positions between events. The token "s1" comes after event 1.
diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py
index cbfbd097f9..5a638c6e9a 100644
--- a/synapse/util/module_loader.py
+++ b/synapse/util/module_loader.py
@@ -51,21 +51,26 @@ def load_module(provider: dict, config_path: Iterable[str]) -> Tuple[Type, Any]:
# Load the module config. If None, pass an empty dictionary instead
module_config = provider.get("config") or {}
- try:
- provider_config = provider_class.parse_config(module_config)
- except jsonschema.ValidationError as e:
- raise json_error_to_config_error(e, itertools.chain(config_path, ("config",)))
- except ConfigError as e:
- raise _wrap_config_error(
- "Failed to parse config for module %r" % (modulename,),
- prefix=itertools.chain(config_path, ("config",)),
- e=e,
- )
- except Exception as e:
- raise ConfigError(
- "Failed to parse config for module %r" % (modulename,),
- path=itertools.chain(config_path, ("config",)),
- ) from e
+ if hasattr(provider_class, "parse_config"):
+ try:
+ provider_config = provider_class.parse_config(module_config)
+ except jsonschema.ValidationError as e:
+ raise json_error_to_config_error(
+ e, itertools.chain(config_path, ("config",))
+ )
+ except ConfigError as e:
+ raise _wrap_config_error(
+ "Failed to parse config for module %r" % (modulename,),
+ prefix=itertools.chain(config_path, ("config",)),
+ e=e,
+ )
+ except Exception as e:
+ raise ConfigError(
+ "Failed to parse config for module %r" % (modulename,),
+ path=itertools.chain(config_path, ("config",)),
+ ) from e
+ else:
+ provider_config = module_config
return provider_class, provider_config
|