diff --git a/CHANGES.md b/CHANGES.md
index aaded5bd24..db11de0e85 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,15 +1,6 @@
Synapse 1.25.0 (2021-01-13)
===========================
-Bugfixes
---------
-
-- Fix HTTP proxy support when using a proxy that is on a blacklisted IP. Introduced in v1.25.0rc1. Contributed by @Bubu. ([\#9084](https://github.com/matrix-org/synapse/issues/9084))
-
-
-Synapse 1.25.0 (2021-01-13)
-==============================
-
Ending Support for Python 3.5 and Postgres 9.5
----------------------------------------------
@@ -27,7 +18,7 @@ Crucially, this means __we will not produce .deb packages for Debian 9 (Stretch)
The website https://endoflife.date/ has convenient summaries of the support schedules for projects like [Python](https://endoflife.date/python) and [PostgreSQL](https://endoflife.date/postgresql).
-If you are unable to upgrade your environment to a supported version of Python or Postgres, we encourage you to consider using the [Synapse Docker images](../INSTALL.md#docker-images-and-ansible-playbooks) instead.
+If you are unable to upgrade your environment to a supported version of Python or Postgres, we encourage you to consider using the [Synapse Docker images](./INSTALL.md#docker-images-and-ansible-playbooks) instead.
### Transition Period
@@ -35,9 +26,6 @@ We will make a good faith attempt to avoid breaking compatibility in all release
We intend to continue producing .deb packages for Debian 9 (Stretch) and Ubuntu 16.04 (Xenial) through the transition period.
-Synapse 1.25.0rc1 (2021-01-06)
-==============================
-
Removal warning
---------------
@@ -49,6 +37,15 @@ are deprecated and will be removed in a future release. They will be replaced by
`POST /_synapse/admin/v1/rooms/<room_id>/delete` replaces `POST /_synapse/admin/v1/purge_room` and
`POST /_synapse/admin/v1/shutdown_room/<room_id>`.
+Bugfixes
+--------
+
+- Fix HTTP proxy support when using a proxy that is on a blacklisted IP. Introduced in v1.25.0rc1. Contributed by @Bubu. ([\#9084](https://github.com/matrix-org/synapse/issues/9084))
+
+
+Synapse 1.25.0rc1 (2021-01-06)
+==============================
+
Features
--------
@@ -98,7 +95,7 @@ Improved Documentation
- Combine related media admin API docs. ([\#8839](https://github.com/matrix-org/synapse/issues/8839))
- Fix an error in the documentation for the SAML username mapping provider. ([\#8873](https://github.com/matrix-org/synapse/issues/8873))
- Clarify comments around template directories in `sample_config.yaml`. ([\#8891](https://github.com/matrix-org/synapse/issues/8891))
-- Moved instructions for database setup, adjusted heading levels and improved syntax highlighting in [INSTALL.md](../INSTALL.md). Contributed by fossterer. ([\#8987](https://github.com/matrix-org/synapse/issues/8987))
+- Move instructions for database setup, adjusted heading levels and improved syntax highlighting in [INSTALL.md](../INSTALL.md). Contributed by @fossterer. ([\#8987](https://github.com/matrix-org/synapse/issues/8987))
- Update the example value of `group_creation_prefix` in the sample configuration. ([\#8992](https://github.com/matrix-org/synapse/issues/8992))
- Link the Synapse developer room to the development section in the docs. ([\#9002](https://github.com/matrix-org/synapse/issues/9002))
diff --git a/changelog.d/8868.misc b/changelog.d/8868.misc
new file mode 100644
index 0000000000..1a11e30944
--- /dev/null
+++ b/changelog.d/8868.misc
@@ -0,0 +1 @@
+Improve efficiency of large state resolutions for new rooms.
diff --git a/changelog.d/8932.feature b/changelog.d/8932.feature
new file mode 100644
index 0000000000..a1d17394d7
--- /dev/null
+++ b/changelog.d/8932.feature
@@ -0,0 +1 @@
+Remove a user's avatar URL and display name when deactivated with the Admin API.
diff --git a/changelog.d/8948.feature b/changelog.d/8948.feature
new file mode 100644
index 0000000000..3b06cbfa22
--- /dev/null
+++ b/changelog.d/8948.feature
@@ -0,0 +1 @@
+Update `/_synapse/admin/v1/users/<user_id>/joined_rooms` to work for both local and remote users.
diff --git a/changelog.d/9016.misc b/changelog.d/9016.misc
new file mode 100644
index 0000000000..0d455b17db
--- /dev/null
+++ b/changelog.d/9016.misc
@@ -0,0 +1 @@
+Ensure rejected events get added to some metadata tables.
diff --git a/changelog.d/9055.misc b/changelog.d/9055.misc
new file mode 100644
index 0000000000..8e0512eb1e
--- /dev/null
+++ b/changelog.d/9055.misc
@@ -0,0 +1 @@
+Drop unused database tables.
diff --git a/changelog.d/9059.bugfix b/changelog.d/9059.bugfix
new file mode 100644
index 0000000000..2933703ffa
--- /dev/null
+++ b/changelog.d/9059.bugfix
@@ -0,0 +1 @@
+Fix incorrect exit code when there is an error at startup.
diff --git a/changelog.d/9063.misc b/changelog.d/9063.misc
new file mode 100644
index 0000000000..22eed43147
--- /dev/null
+++ b/changelog.d/9063.misc
@@ -0,0 +1 @@
+Removes unnecessary declarations in the tests for the admin API.
\ No newline at end of file
diff --git a/changelog.d/9067.feature b/changelog.d/9067.feature
new file mode 100644
index 0000000000..01a24dcf49
--- /dev/null
+++ b/changelog.d/9067.feature
@@ -0,0 +1 @@
+Add support for multiple SSO Identity Providers.
diff --git a/changelog.d/9068.feature b/changelog.d/9068.feature
new file mode 100644
index 0000000000..cdf1844fa7
--- /dev/null
+++ b/changelog.d/9068.feature
@@ -0,0 +1 @@
+Add experimental support for handling `/keys/claim` and `/room_keys` APIs on worker processes.
diff --git a/changelog.d/9069.misc b/changelog.d/9069.misc
new file mode 100644
index 0000000000..5e9e62d252
--- /dev/null
+++ b/changelog.d/9069.misc
@@ -0,0 +1 @@
+Remove `SynapseRequest.get_user_agent`.
diff --git a/changelog.d/9070.bugfix b/changelog.d/9070.bugfix
new file mode 100644
index 0000000000..72b8fe9f1c
--- /dev/null
+++ b/changelog.d/9070.bugfix
@@ -0,0 +1 @@
+Fix `JSONDecodeError` spamming the logs when sending transactions to remote servers.
diff --git a/changelog.d/9071.bugfix b/changelog.d/9071.bugfix
new file mode 100644
index 0000000000..0201271f84
--- /dev/null
+++ b/changelog.d/9071.bugfix
@@ -0,0 +1 @@
+Fix "Failed to send request" errors when a client provides an invalid room alias.
diff --git a/changelog.d/9080.misc b/changelog.d/9080.misc
new file mode 100644
index 0000000000..3da8171f5f
--- /dev/null
+++ b/changelog.d/9080.misc
@@ -0,0 +1 @@
+Remove redundant `Homeserver.get_ip_from_request` method.
diff --git a/changelog.d/9081.feature b/changelog.d/9081.feature
new file mode 100644
index 0000000000..01a24dcf49
--- /dev/null
+++ b/changelog.d/9081.feature
@@ -0,0 +1 @@
+Add support for multiple SSO Identity Providers.
diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst
index e4d6f8203b..b3d413cf57 100644
--- a/docs/admin_api/user_admin_api.rst
+++ b/docs/admin_api/user_admin_api.rst
@@ -98,6 +98,8 @@ Body parameters:
- ``deactivated``, optional. If unspecified, deactivation state will be left
unchanged on existing accounts and set to ``false`` for new accounts.
+ A user cannot be erased by deactivating with this API. For details on deactivating users see
+ `Deactivate Account <#deactivate-account>`_.
If the user already exists then optional parameters default to the current value.
@@ -248,6 +250,25 @@ server admin: see `README.rst <README.rst>`_.
The erase parameter is optional and defaults to ``false``.
An empty body may be passed for backwards compatibility.
+The following actions are performed when deactivating an user:
+
+- Try to unpind 3PIDs from the identity server
+- Remove all 3PIDs from the homeserver
+- Delete all devices and E2EE keys
+- Delete all access tokens
+- Delete the password hash
+- Removal from all rooms the user is a member of
+- Remove the user from the user directory
+- Reject all pending invites
+- Remove all account validity information related to the user
+
+The following additional actions are performed during deactivation if``erase``
+is set to ``true``:
+
+- Remove the user's display name
+- Remove the user's avatar URL
+- Mark the user as erased
+
Reset password
==============
@@ -337,6 +358,10 @@ A response body like the following is returned:
"total": 2
}
+The server returns the list of rooms of which the user and the server
+are member. If the user is local, all the rooms of which the user is
+member are returned.
+
**Parameters**
The following parameters should be set in the URL:
diff --git a/docs/auth_chain_diff.dot b/docs/auth_chain_diff.dot
new file mode 100644
index 0000000000..978d579ada
--- /dev/null
+++ b/docs/auth_chain_diff.dot
@@ -0,0 +1,32 @@
+digraph auth {
+ nodesep=0.5;
+ rankdir="RL";
+
+ C [label="Create (1,1)"];
+
+ BJ [label="Bob's Join (2,1)", color=red];
+ BJ2 [label="Bob's Join (2,2)", color=red];
+ BJ2 -> BJ [color=red, dir=none];
+
+ subgraph cluster_foo {
+ A1 [label="Alice's invite (4,1)", color=blue];
+ A2 [label="Alice's Join (4,2)", color=blue];
+ A3 [label="Alice's Join (4,3)", color=blue];
+ A3 -> A2 -> A1 [color=blue, dir=none];
+ color=none;
+ }
+
+ PL1 [label="Power Level (3,1)", color=darkgreen];
+ PL2 [label="Power Level (3,2)", color=darkgreen];
+ PL2 -> PL1 [color=darkgreen, dir=none];
+
+ {rank = same; C; BJ; PL1; A1;}
+
+ A1 -> C [color=grey];
+ A1 -> BJ [color=grey];
+ PL1 -> C [color=grey];
+ BJ2 -> PL1 [penwidth=2];
+
+ A3 -> PL2 [penwidth=2];
+ A1 -> PL1 -> BJ -> C [penwidth=2];
+}
diff --git a/docs/auth_chain_diff.dot.png b/docs/auth_chain_diff.dot.png
new file mode 100644
index 0000000000..771c07308f
--- /dev/null
+++ b/docs/auth_chain_diff.dot.png
Binary files differdiff --git a/docs/auth_chain_difference_algorithm.md b/docs/auth_chain_difference_algorithm.md
new file mode 100644
index 0000000000..30f72a70da
--- /dev/null
+++ b/docs/auth_chain_difference_algorithm.md
@@ -0,0 +1,108 @@
+# Auth Chain Difference Algorithm
+
+The auth chain difference algorithm is used by V2 state resolution, where a
+naive implementation can be a significant source of CPU and DB usage.
+
+### Definitions
+
+A *state set* is a set of state events; e.g. the input of a state resolution
+algorithm is a collection of state sets.
+
+The *auth chain* of a set of events are all the events' auth events and *their*
+auth events, recursively (i.e. the events reachable by walking the graph induced
+by an event's auth events links).
+
+The *auth chain difference* of a collection of state sets is the union minus the
+intersection of the sets of auth chains corresponding to the state sets, i.e an
+event is in the auth chain difference if it is reachable by walking the auth
+event graph from at least one of the state sets but not from *all* of the state
+sets.
+
+## Breadth First Walk Algorithm
+
+A way of calculating the auth chain difference without calculating the full auth
+chains for each state set is to do a parallel breadth first walk (ordered by
+depth) of each state set's auth chain. By tracking which events are reachable
+from each state set we can finish early if every pending event is reachable from
+every state set.
+
+This can work well for state sets that have a small auth chain difference, but
+can be very inefficient for larger differences. However, this algorithm is still
+used if we don't have a chain cover index for the room (e.g. because we're in
+the process of indexing it).
+
+## Chain Cover Index
+
+Synapse computes auth chain differences by pre-computing a "chain cover" index
+for the auth chain in a room, allowing efficient reachability queries like "is
+event A in the auth chain of event B". This is done by assigning every event a
+*chain ID* and *sequence number* (e.g. `(5,3)`), and having a map of *links*
+between chains (e.g. `(5,3) -> (2,4)`) such that A is reachable by B (i.e. `A`
+is in the auth chain of `B`) if and only if either:
+
+1. A and B have the same chain ID and `A`'s sequence number is less than `B`'s
+ sequence number; or
+2. there is a link `L` between `B`'s chain ID and `A`'s chain ID such that
+ `L.start_seq_no` <= `B.seq_no` and `A.seq_no` <= `L.end_seq_no`.
+
+There are actually two potential implementations, one where we store links from
+each chain to every other reachable chain (the transitive closure of the links
+graph), and one where we remove redundant links (the transitive reduction of the
+links graph) e.g. if we have chains `C3 -> C2 -> C1` then the link `C3 -> C1`
+would not be stored. Synapse uses the former implementations so that it doesn't
+need to recurse to test reachability between chains.
+
+### Example
+
+An example auth graph would look like the following, where chains have been
+formed based on type/state_key and are denoted by colour and are labelled with
+`(chain ID, sequence number)`. Links are denoted by the arrows (links in grey
+are those that would be remove in the second implementation described above).
+
+
+
+Note that we don't include all links between events and their auth events, as
+most of those links would be redundant. For example, all events point to the
+create event, but each chain only needs the one link from it's base to the
+create event.
+
+## Using the Index
+
+This index can be used to calculate the auth chain difference of the state sets
+by looking at the chain ID and sequence numbers reachable from each state set:
+
+1. For every state set lookup the chain ID/sequence numbers of each state event
+2. Use the index to find all chains and the maximum sequence number reachable
+ from each state set.
+3. The auth chain difference is then all events in each chain that have sequence
+ numbers between the maximum sequence number reachable from *any* state set and
+ the minimum reachable by *all* state sets (if any).
+
+Note that steps 2 is effectively calculating the auth chain for each state set
+(in terms of chain IDs and sequence numbers), and step 3 is calculating the
+difference between the union and intersection of the auth chains.
+
+### Worked Example
+
+For example, given the above graph, we can calculate the difference between
+state sets consisting of:
+
+1. `S1`: Alice's invite `(4,1)` and Bob's second join `(2,2)`; and
+2. `S2`: Alice's second join `(4,3)` and Bob's first join `(2,1)`.
+
+Using the index we see that the following auth chains are reachable from each
+state set:
+
+1. `S1`: `(1,1)`, `(2,2)`, `(3,1)` & `(4,1)`
+2. `S2`: `(1,1)`, `(2,1)`, `(3,2)` & `(4,3)`
+
+And so, for each the ranges that are in the auth chain difference:
+1. Chain 1: None, (since everything can reach the create event).
+2. Chain 2: The range `(1, 2]` (i.e. just `2`), as `1` is reachable by all state
+ sets and the maximum reachable is `2` (corresponding to Bob's second join).
+3. Chain 3: Similarly the range `(1, 2]` (corresponding to the second power
+ level).
+4. Chain 4: The range `(1, 3]` (corresponding to both of Alice's joins).
+
+So the final result is: Bob's second join `(2,2)`, the second power level
+`(3,2)` and both of Alice's joins `(4,2)` & `(4,3)`.
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 70b76238b4..b575d85976 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.appservice import ApplicationService
from synapse.events import EventBase
+from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing as opentracing
from synapse.storage.databases.main.registration import TokenLookupResult
@@ -186,8 +187,8 @@ class Auth:
AuthError if access is denied for the user in the access token
"""
try:
- ip_addr = self.hs.get_ip_from_request(request)
- user_agent = request.get_user_agent("")
+ ip_addr = request.getClientIP()
+ user_agent = get_request_user_agent(request)
access_token = self.get_access_token_from_request(request)
@@ -276,7 +277,7 @@ class Auth:
return None, None
if app_service.ip_range_whitelist:
- ip_address = IPAddress(self.hs.get_ip_from_request(request))
+ ip_address = IPAddress(request.getClientIP())
if ip_address not in app_service.ip_range_whitelist:
return None, None
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 37ecdbe3d8..395e202b89 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +20,7 @@ import signal
import socket
import sys
import traceback
-from typing import Iterable
+from typing import Awaitable, Callable, Iterable
from typing_extensions import NoReturn
@@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+ """Register a callback with the reactor, to be called once it is running
+
+ This can be used to initialise parts of the system which require an asynchronous
+ setup.
+
+ Any exception raised by the callback will be printed and logged, and the process
+ will exit.
+ """
+
+ async def wrapper():
+ try:
+ await cb(*args, **kwargs)
+ except Exception:
+ # previously, we used Failure().printTraceback() here, in the hope that
+ # would give better tracebacks than traceback.print_exc(). However, that
+ # doesn't handle chained exceptions (with a __cause__ or __context__) well,
+ # and I *think* the need for Failure() is reduced now that we mostly use
+ # async/await.
+
+ # Write the exception to both the logs *and* the unredirected stderr,
+ # because people tend to get confused if it only goes to one or the other.
+ #
+ # One problem with this is that if people are using a logging config that
+ # logs to the console (as is common eg under docker), they will get two
+ # copies of the exception. We could maybe try to detect that, but it's
+ # probably a cost we can bear.
+ logger.fatal("Error during startup", exc_info=True)
+ print("Error during startup:", file=sys.__stderr__)
+ traceback.print_exc(file=sys.__stderr__)
+
+ # it's no use calling sys.exit here, since that just raises a SystemExit
+ # exception which is then caught by the reactor, and everything carries
+ # on as normal.
+ os._exit(1)
+
+ reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
+
+
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
@@ -227,7 +267,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
-def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.
@@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
- try:
- # Set up the SIGHUP machinery.
- if hasattr(signal, "SIGHUP"):
+ # Set up the SIGHUP machinery.
+ if hasattr(signal, "SIGHUP"):
+ reactor = hs.get_reactor()
- reactor = hs.get_reactor()
+ @wrap_as_background_process("sighup")
+ def handle_sighup(*args, **kwargs):
+ # Tell systemd our state, if we're using it. This will silently fail if
+ # we're not using systemd.
+ sdnotify(b"RELOADING=1")
- @wrap_as_background_process("sighup")
- def handle_sighup(*args, **kwargs):
- # Tell systemd our state, if we're using it. This will silently fail if
- # we're not using systemd.
- sdnotify(b"RELOADING=1")
+ for i, args, kwargs in _sighup_callbacks:
+ i(*args, **kwargs)
- for i, args, kwargs in _sighup_callbacks:
- i(*args, **kwargs)
+ sdnotify(b"READY=1")
- sdnotify(b"READY=1")
+ # We defer running the sighup handlers until next reactor tick. This
+ # is so that we're in a sane state, e.g. flushing the logs may fail
+ # if the sighup happens in the middle of writing a log entry.
+ def run_sighup(*args, **kwargs):
+ # `callFromThread` should be "signal safe" as well as thread
+ # safe.
+ reactor.callFromThread(handle_sighup, *args, **kwargs)
- # We defer running the sighup handlers until next reactor tick. This
- # is so that we're in a sane state, e.g. flushing the logs may fail
- # if the sighup happens in the middle of writing a log entry.
- def run_sighup(*args, **kwargs):
- # `callFromThread` should be "signal safe" as well as thread
- # safe.
- reactor.callFromThread(handle_sighup, *args, **kwargs)
+ signal.signal(signal.SIGHUP, run_sighup)
- signal.signal(signal.SIGHUP, run_sighup)
+ register_sighup(refresh_certificate, hs)
- register_sighup(refresh_certificate, hs)
+ # Load the certificate from disk.
+ refresh_certificate(hs)
- # Load the certificate from disk.
- refresh_certificate(hs)
+ # Start the tracer
+ synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
+ hs
+ )
- # Start the tracer
- synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
- hs
- )
+ # It is now safe to start your Synapse.
+ hs.start_listening(listeners)
+ hs.get_datastore().db_pool.start_profiling()
+ hs.get_pusherpool().start()
+
+ # Log when we start the shut down process.
+ hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", logger.info, "Shutting down..."
+ )
- # It is now safe to start your Synapse.
- hs.start_listening(listeners)
- hs.get_datastore().db_pool.start_profiling()
- hs.get_pusherpool().start()
+ setup_sentry(hs)
+ setup_sdnotify(hs)
- # Log when we start the shut down process.
- hs.get_reactor().addSystemEventTrigger(
- "before", "shutdown", logger.info, "Shutting down..."
- )
+ # If background tasks are running on the main process, start collecting the
+ # phone home stats.
+ if hs.config.run_background_tasks:
+ start_phone_stats_home(hs)
- setup_sentry(hs)
- setup_sdnotify(hs)
-
- # If background tasks are running on the main process, start collecting the
- # phone home stats.
- if hs.config.run_background_tasks:
- start_phone_stats_home(hs)
-
- # We now freeze all allocated objects in the hopes that (almost)
- # everything currently allocated are things that will be used for the
- # rest of time. Doing so means less work each GC (hopefully).
- #
- # This only works on Python 3.7
- if sys.version_info >= (3, 7):
- gc.collect()
- gc.freeze()
- except Exception:
- traceback.print_exc(file=sys.stderr)
- reactor = hs.get_reactor()
- if reactor.running:
- reactor.stop()
- sys.exit(1)
+ # We now freeze all allocated objects in the hopes that (almost)
+ # everything currently allocated are things that will be used for the
+ # rest of time. Doing so means less work each GC (hopefully).
+ #
+ # This only works on Python 3.7
+ if sys.version_info >= (3, 7):
+ gc.collect()
+ gc.freeze()
def setup_sentry(hs):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 038f698801..b413a770d0 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
-from twisted.internet import address, reactor
+from twisted.internet import address
import synapse
import synapse.events
@@ -34,6 +34,7 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
+from synapse.app._base import register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -99,14 +100,18 @@ from synapse.rest.client.v1.profile import (
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.voip import VoipRestServlet
-from synapse.rest.client.v2_alpha import groups, sync, user_directory
+from synapse.rest.client.v2_alpha import groups, room_keys, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
-from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
+from synapse.rest.client.v2_alpha.keys import (
+ KeyChangesServlet,
+ KeyQueryServlet,
+ OneTimeKeyServlet,
+)
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
@@ -115,6 +120,7 @@ from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
+from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
@@ -446,6 +452,7 @@ class GenericWorkerSlavedStore(
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
+ EndToEndRoomKeyStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -503,6 +510,7 @@ class GenericWorkerServer(HomeServer):
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
+ OneTimeKeyServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
@@ -520,6 +528,7 @@ class GenericWorkerServer(HomeServer):
room.register_servlets(self, resource, True)
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
+ room_keys.register_servlets(self, resource)
SendToDeviceRestServlet(self).register(resource)
@@ -960,9 +969,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
- reactor.addSystemEventTrigger(
- "before", "startup", _base.start, hs, config.worker_listeners
- )
+ register_start(_base.start, hs, config.worker_listeners)
_base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 42b5dc53d7..cbecf23be6 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,8 +20,7 @@ import os
import sys
from typing import Iterable, Iterator
-from twisted.internet import defer, reactor
-from twisted.python.failure import Failure
+from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -38,7 +37,7 @@ from synapse.api.urls import (
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
@@ -414,40 +413,29 @@ def setup(config_options):
_base.refresh_certificate(hs)
async def start():
- try:
- # Run the ACME provisioning code, if it's enabled.
- if hs.config.acme_enabled:
- acme = hs.get_acme_handler()
- # Start up the webservices which we will respond to ACME
- # challenges with, and then provision.
- await acme.start_listening()
- await do_acme()
-
- # Check if it needs to be reprovisioned every day.
- hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
-
- # Load the OIDC provider metadatas, if OIDC is enabled.
- if hs.config.oidc_enabled:
- oidc = hs.get_oidc_handler()
- # Loading the provider metadata also ensures the provider config is valid.
- await oidc.load_metadata()
- await oidc.load_jwks()
-
- _base.start(hs, config.listeners)
-
- hs.get_datastore().db_pool.updates.start_doing_background_updates()
- except Exception:
- # Print the exception and bail out.
- print("Error during startup:", file=sys.stderr)
-
- # this gives better tracebacks than traceback.print_exc()
- Failure().printTraceback(file=sys.stderr)
-
- if reactor.running:
- reactor.stop()
- sys.exit(1)
-
- reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
+ # Run the ACME provisioning code, if it's enabled.
+ if hs.config.acme_enabled:
+ acme = hs.get_acme_handler()
+ # Start up the webservices which we will respond to ACME
+ # challenges with, and then provision.
+ await acme.start_listening()
+ await do_acme()
+
+ # Check if it needs to be reprovisioned every day.
+ hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
+
+ # Load the OIDC provider metadatas, if OIDC is enabled.
+ if hs.config.oidc_enabled:
+ oidc = hs.get_oidc_handler()
+ # Loading the provider metadata also ensures the provider config is valid.
+ await oidc.load_metadata()
+ await oidc.load_jwks()
+
+ await _base.start(hs, config.listeners)
+
+ hs.get_datastore().db_pool.updates.start_doing_background_updates()
+
+ register_start(start)
return hs
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7309884226..178d50df90 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -49,8 +49,13 @@ from synapse.api.errors import (
UserDeactivatedError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.handlers.ui_auth import INTERACTIVE_AUTH_CHECKERS
+from synapse.handlers._base import BaseHandler
+from synapse.handlers.ui_auth import (
+ INTERACTIVE_AUTH_CHECKERS,
+ UIAuthSessionDataConstants,
+)
from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker
+from synapse.http import get_request_user_agent
from synapse.http.server import finish_request, respond_with_html
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
@@ -62,8 +67,6 @@ from synapse.util.async_helpers import maybe_awaitable
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.threepids import canonicalise_email
-from ._base import BaseHandler
-
if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer
@@ -284,7 +287,6 @@ class AuthHandler(BaseHandler):
requester: Requester,
request: SynapseRequest,
request_body: Dict[str, Any],
- clientip: str,
description: str,
) -> Tuple[dict, Optional[str]]:
"""
@@ -301,8 +303,6 @@ class AuthHandler(BaseHandler):
request_body: The body of the request sent by the client
- clientip: The IP address of the client.
-
description: A human readable string to be displayed to the user that
describes the operation happening on their account.
@@ -338,10 +338,10 @@ class AuthHandler(BaseHandler):
request_body.pop("auth", None)
return request_body, None
- user_id = requester.user.to_string()
+ requester_user_id = requester.user.to_string()
# Check if we should be ratelimited due to too many previous failed attempts
- self._failed_uia_attempts_ratelimiter.ratelimit(user_id, update=False)
+ self._failed_uia_attempts_ratelimiter.ratelimit(requester_user_id, update=False)
# build a list of supported flows
supported_ui_auth_types = await self._get_available_ui_auth_types(
@@ -349,13 +349,16 @@ class AuthHandler(BaseHandler):
)
flows = [[login_type] for login_type in supported_ui_auth_types]
+ def get_new_session_data() -> JsonDict:
+ return {UIAuthSessionDataConstants.REQUEST_USER_ID: requester_user_id}
+
try:
result, params, session_id = await self.check_ui_auth(
- flows, request, request_body, clientip, description
+ flows, request, request_body, description, get_new_session_data,
)
except LoginError:
# Update the ratelimiter to say we failed (`can_do_action` doesn't raise).
- self._failed_uia_attempts_ratelimiter.can_do_action(user_id)
+ self._failed_uia_attempts_ratelimiter.can_do_action(requester_user_id)
raise
# find the completed login type
@@ -363,14 +366,14 @@ class AuthHandler(BaseHandler):
if login_type not in result:
continue
- user_id = result[login_type]
+ validated_user_id = result[login_type]
break
else:
# this can't happen
raise Exception("check_auth returned True but no successful login type")
# check that the UI auth matched the access token
- if user_id != requester.user.to_string():
+ if validated_user_id != requester_user_id:
raise AuthError(403, "Invalid auth")
# Note that the access token has been validated.
@@ -402,13 +405,9 @@ class AuthHandler(BaseHandler):
# if sso is enabled, allow the user to log in via SSO iff they have a mapping
# from sso to mxid.
- if self.hs.config.saml2.saml2_enabled or self.hs.config.oidc.oidc_enabled:
- if await self.store.get_external_ids_by_user(user.to_string()):
- ui_auth_types.add(LoginType.SSO)
-
- # Our CAS impl does not (yet) correctly register users in user_external_ids,
- # so always offer that if it's available.
- if self.hs.config.cas.cas_enabled:
+ if await self.hs.get_sso_handler().get_identity_providers_for_user(
+ user.to_string()
+ ):
ui_auth_types.add(LoginType.SSO)
return ui_auth_types
@@ -426,8 +425,8 @@ class AuthHandler(BaseHandler):
flows: List[List[str]],
request: SynapseRequest,
clientdict: Dict[str, Any],
- clientip: str,
description: str,
+ get_new_session_data: Optional[Callable[[], JsonDict]] = None,
) -> Tuple[dict, dict, str]:
"""
Takes a dictionary sent by the client in the login / registration
@@ -448,11 +447,16 @@ class AuthHandler(BaseHandler):
clientdict: The dictionary from the client root level, not the
'auth' key: this method prompts for auth if none is sent.
- clientip: The IP address of the client.
-
description: A human readable string to be displayed to the user that
describes the operation happening on their account.
+ get_new_session_data:
+ an optional callback which will be called when starting a new session.
+ it should return data to be stored as part of the session.
+
+ The keys of the returned data should be entries in
+ UIAuthSessionDataConstants.
+
Returns:
A tuple of (creds, params, session_id).
@@ -480,10 +484,15 @@ class AuthHandler(BaseHandler):
# If there's no session ID, create a new session.
if not sid:
+ new_session_data = get_new_session_data() if get_new_session_data else {}
+
session = await self.store.create_ui_auth_session(
clientdict, uri, method, description
)
+ for k, v in new_session_data.items():
+ await self.set_session_data(session.session_id, k, v)
+
else:
try:
session = await self.store.get_ui_auth_session(sid)
@@ -539,7 +548,8 @@ class AuthHandler(BaseHandler):
# authentication flow.
await self.store.set_ui_auth_clientdict(sid, clientdict)
- user_agent = request.get_user_agent("")
+ user_agent = get_request_user_agent(request)
+ clientip = request.getClientIP()
await self.store.add_user_agent_ip_to_ui_auth_session(
session.session_id, user_agent, clientip
@@ -634,7 +644,8 @@ class AuthHandler(BaseHandler):
Args:
session_id: The ID of this session as returned from check_auth
- key: The key to store the data under
+ key: The key to store the data under. An entry from
+ UIAuthSessionDataConstants.
value: The data to store
"""
try:
@@ -650,7 +661,8 @@ class AuthHandler(BaseHandler):
Args:
session_id: The ID of this session as returned from check_auth
- key: The key to store the data under
+ key: The key the data was stored under. An entry from
+ UIAuthSessionDataConstants.
default: Value to return if the key has not been set
"""
try:
@@ -1324,12 +1336,12 @@ class AuthHandler(BaseHandler):
else:
return False
- async def start_sso_ui_auth(self, redirect_url: str, session_id: str) -> str:
+ async def start_sso_ui_auth(self, request: SynapseRequest, session_id: str) -> str:
"""
Get the HTML for the SSO redirect confirmation page.
Args:
- redirect_url: The URL to redirect to the SSO provider.
+ request: The incoming HTTP request
session_id: The user interactive authentication session ID.
Returns:
@@ -1339,6 +1351,35 @@ class AuthHandler(BaseHandler):
session = await self.store.get_ui_auth_session(session_id)
except StoreError:
raise SynapseError(400, "Unknown session ID: %s" % (session_id,))
+
+ user_id_to_verify = await self.get_session_data(
+ session_id, UIAuthSessionDataConstants.REQUEST_USER_ID
+ ) # type: str
+
+ idps = await self.hs.get_sso_handler().get_identity_providers_for_user(
+ user_id_to_verify
+ )
+
+ if not idps:
+ # we checked that the user had some remote identities before offering an SSO
+ # flow, so either it's been deleted or the client has requested SSO despite
+ # it not being offered.
+ raise SynapseError(400, "User has no SSO identities")
+
+ # for now, just pick one
+ idp_id, sso_auth_provider = next(iter(idps.items()))
+ if len(idps) > 0:
+ logger.warning(
+ "User %r has previously logged in with multiple SSO IdPs; arbitrarily "
+ "picking %r",
+ user_id_to_verify,
+ idp_id,
+ )
+
+ redirect_url = await sso_auth_provider.handle_redirect_request(
+ request, None, session_id
+ )
+
return self._sso_auth_confirm_template.render(
description=session.description, redirect_url=redirect_url,
)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 366787ca33..ac25e3e94f 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID, create_requester
+from synapse.types import Requester, UserID, create_requester
from ._base import BaseHandler
@@ -53,16 +53,23 @@ class DeactivateAccountHandler(BaseHandler):
self._account_validity_enabled = hs.config.account_validity_enabled
async def deactivate_account(
- self, user_id: str, erase_data: bool, id_server: Optional[str] = None
+ self,
+ user_id: str,
+ erase_data: bool,
+ requester: Requester,
+ id_server: Optional[str] = None,
+ by_admin: bool = False,
) -> bool:
"""Deactivate a user's account
Args:
user_id: ID of user to be deactivated
erase_data: whether to GDPR-erase the user's data
+ requester: The user attempting to make this change.
id_server: Use the given identity server when unbinding
any threepids. If None then will attempt to unbind using the
identity server specified when binding (if known).
+ by_admin: Whether this change was made by an administrator.
Returns:
True if identity server supports removing threepids, otherwise False.
@@ -125,6 +132,12 @@ class DeactivateAccountHandler(BaseHandler):
# Mark the user as erased, if they asked for that
if erase_data:
+ user = UserID.from_string(user_id)
+ # Remove avatar URL from this user
+ await self._profile_handler.set_avatar_url(user, requester, "", by_admin)
+ # Remove displayname from this user
+ await self._profile_handler.set_displayname(user, requester, "", by_admin)
+
logger.info("Marking %s as erased", user_id)
await self.store.mark_user_erased(user_id)
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 6835c6c462..88097639ef 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -14,7 +14,7 @@
# limitations under the License.
import inspect
import logging
-from typing import TYPE_CHECKING, Dict, Generic, List, Optional, Tuple, TypeVar
+from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar
from urllib.parse import urlencode
import attr
@@ -35,7 +35,6 @@ from typing_extensions import TypedDict
from twisted.web.client import readBody
from synapse.config import ConfigError
-from synapse.handlers._base import BaseHandler
from synapse.handlers.sso import MappingException, UserAttributes
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
@@ -85,12 +84,15 @@ class OidcError(Exception):
return self.error
-class OidcHandler(BaseHandler):
+class OidcHandler:
"""Handles requests related to the OpenID Connect login flow.
"""
def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
+ self._store = hs.get_datastore()
+
+ self._token_generator = OidcSessionTokenGenerator(hs)
+
self._callback_url = hs.config.oidc_callback_url # type: str
self._scopes = hs.config.oidc_scopes # type: List[str]
self._user_profile_method = hs.config.oidc_user_profile_method # type: str
@@ -116,7 +118,6 @@ class OidcHandler(BaseHandler):
self._http_client = hs.get_proxied_http_client()
self._server_name = hs.config.server_name # type: str
- self._macaroon_secret_key = hs.config.macaroon_secret_key
# identifier for the external_ids table
self.idp_id = "oidc"
@@ -519,11 +520,13 @@ class OidcHandler(BaseHandler):
if not client_redirect_url:
client_redirect_url = b""
- cookie = self._generate_oidc_session_token(
+ cookie = self._token_generator.generate_oidc_session_token(
state=state,
- nonce=nonce,
- client_redirect_url=client_redirect_url.decode(),
- ui_auth_session_id=ui_auth_session_id,
+ session_data=OidcSessionData(
+ nonce=nonce,
+ client_redirect_url=client_redirect_url.decode(),
+ ui_auth_session_id=ui_auth_session_id,
+ ),
)
request.addCookie(
SESSION_COOKIE_NAME,
@@ -628,11 +631,9 @@ class OidcHandler(BaseHandler):
# Deserialize the session token and verify it.
try:
- (
- nonce,
- client_redirect_url,
- ui_auth_session_id,
- ) = self._verify_oidc_session_token(session, state)
+ session_data = self._token_generator.verify_oidc_session_token(
+ session, state
+ )
except MacaroonDeserializationException as e:
logger.exception("Invalid session")
self._sso_handler.render_error(request, "invalid_session", str(e))
@@ -674,14 +675,14 @@ class OidcHandler(BaseHandler):
else:
logger.debug("Extracting userinfo from id_token")
try:
- userinfo = await self._parse_id_token(token, nonce=nonce)
+ userinfo = await self._parse_id_token(token, nonce=session_data.nonce)
except Exception as e:
logger.exception("Invalid id_token")
self._sso_handler.render_error(request, "invalid_token", str(e))
return
# first check if we're doing a UIA
- if ui_auth_session_id:
+ if session_data.ui_auth_session_id:
try:
remote_user_id = self._remote_id_from_userinfo(userinfo)
except Exception as e:
@@ -690,7 +691,7 @@ class OidcHandler(BaseHandler):
return
return await self._sso_handler.complete_sso_ui_auth_request(
- self.idp_id, remote_user_id, ui_auth_session_id, request
+ self.idp_id, remote_user_id, session_data.ui_auth_session_id, request
)
# otherwise, it's a login
@@ -698,133 +699,12 @@ class OidcHandler(BaseHandler):
# Call the mapper to register/login the user
try:
await self._complete_oidc_login(
- userinfo, token, request, client_redirect_url
+ userinfo, token, request, session_data.client_redirect_url
)
except MappingException as e:
logger.exception("Could not map user")
self._sso_handler.render_error(request, "mapping_error", str(e))
- def _generate_oidc_session_token(
- self,
- state: str,
- nonce: str,
- client_redirect_url: str,
- ui_auth_session_id: Optional[str],
- duration_in_ms: int = (60 * 60 * 1000),
- ) -> str:
- """Generates a signed token storing data about an OIDC session.
-
- When Synapse initiates an authorization flow, it creates a random state
- and a random nonce. Those parameters are given to the provider and
- should be verified when the client comes back from the provider.
- It is also used to store the client_redirect_url, which is used to
- complete the SSO login flow.
-
- Args:
- state: The ``state`` parameter passed to the OIDC provider.
- nonce: The ``nonce`` parameter passed to the OIDC provider.
- client_redirect_url: The URL the client gave when it initiated the
- flow.
- ui_auth_session_id: The session ID of the ongoing UI Auth (or
- None if this is a login).
- duration_in_ms: An optional duration for the token in milliseconds.
- Defaults to an hour.
-
- Returns:
- A signed macaroon token with the session information.
- """
- macaroon = pymacaroons.Macaroon(
- location=self._server_name, identifier="key", key=self._macaroon_secret_key,
- )
- macaroon.add_first_party_caveat("gen = 1")
- macaroon.add_first_party_caveat("type = session")
- macaroon.add_first_party_caveat("state = %s" % (state,))
- macaroon.add_first_party_caveat("nonce = %s" % (nonce,))
- macaroon.add_first_party_caveat(
- "client_redirect_url = %s" % (client_redirect_url,)
- )
- if ui_auth_session_id:
- macaroon.add_first_party_caveat(
- "ui_auth_session_id = %s" % (ui_auth_session_id,)
- )
- now = self.clock.time_msec()
- expiry = now + duration_in_ms
- macaroon.add_first_party_caveat("time < %d" % (expiry,))
-
- return macaroon.serialize()
-
- def _verify_oidc_session_token(
- self, session: bytes, state: str
- ) -> Tuple[str, str, Optional[str]]:
- """Verifies and extract an OIDC session token.
-
- This verifies that a given session token was issued by this homeserver
- and extract the nonce and client_redirect_url caveats.
-
- Args:
- session: The session token to verify
- state: The state the OIDC provider gave back
-
- Returns:
- The nonce, client_redirect_url, and ui_auth_session_id for this session
- """
- macaroon = pymacaroons.Macaroon.deserialize(session)
-
- v = pymacaroons.Verifier()
- v.satisfy_exact("gen = 1")
- v.satisfy_exact("type = session")
- v.satisfy_exact("state = %s" % (state,))
- v.satisfy_general(lambda c: c.startswith("nonce = "))
- v.satisfy_general(lambda c: c.startswith("client_redirect_url = "))
- # Sometimes there's a UI auth session ID, it seems to be OK to attempt
- # to always satisfy this.
- v.satisfy_general(lambda c: c.startswith("ui_auth_session_id = "))
- v.satisfy_general(self._verify_expiry)
-
- v.verify(macaroon, self._macaroon_secret_key)
-
- # Extract the `nonce`, `client_redirect_url`, and maybe the
- # `ui_auth_session_id` from the token.
- nonce = self._get_value_from_macaroon(macaroon, "nonce")
- client_redirect_url = self._get_value_from_macaroon(
- macaroon, "client_redirect_url"
- )
- try:
- ui_auth_session_id = self._get_value_from_macaroon(
- macaroon, "ui_auth_session_id"
- ) # type: Optional[str]
- except ValueError:
- ui_auth_session_id = None
-
- return nonce, client_redirect_url, ui_auth_session_id
-
- def _get_value_from_macaroon(self, macaroon: pymacaroons.Macaroon, key: str) -> str:
- """Extracts a caveat value from a macaroon token.
-
- Args:
- macaroon: the token
- key: the key of the caveat to extract
-
- Returns:
- The extracted value
-
- Raises:
- Exception: if the caveat was not in the macaroon
- """
- prefix = key + " = "
- for caveat in macaroon.caveats:
- if caveat.caveat_id.startswith(prefix):
- return caveat.caveat_id[len(prefix) :]
- raise ValueError("No %s caveat in macaroon" % (key,))
-
- def _verify_expiry(self, caveat: str) -> bool:
- prefix = "time < "
- if not caveat.startswith(prefix):
- return False
- expiry = int(caveat[len(prefix) :])
- now = self.clock.time_msec()
- return now < expiry
-
async def _complete_oidc_login(
self,
userinfo: UserInfo,
@@ -901,8 +781,8 @@ class OidcHandler(BaseHandler):
# and attempt to match it.
attributes = await oidc_response_to_user_attributes(failures=0)
- user_id = UserID(attributes.localpart, self.server_name).to_string()
- users = await self.store.get_users_by_id_case_insensitive(user_id)
+ user_id = UserID(attributes.localpart, self._server_name).to_string()
+ users = await self._store.get_users_by_id_case_insensitive(user_id)
if users:
# If an existing matrix ID is returned, then use it.
if len(users) == 1:
@@ -954,6 +834,148 @@ class OidcHandler(BaseHandler):
return str(remote_user_id)
+class OidcSessionTokenGenerator:
+ """Methods for generating and checking OIDC Session cookies."""
+
+ def __init__(self, hs: "HomeServer"):
+ self._clock = hs.get_clock()
+ self._server_name = hs.hostname
+ self._macaroon_secret_key = hs.config.key.macaroon_secret_key
+
+ def generate_oidc_session_token(
+ self,
+ state: str,
+ session_data: "OidcSessionData",
+ duration_in_ms: int = (60 * 60 * 1000),
+ ) -> str:
+ """Generates a signed token storing data about an OIDC session.
+
+ When Synapse initiates an authorization flow, it creates a random state
+ and a random nonce. Those parameters are given to the provider and
+ should be verified when the client comes back from the provider.
+ It is also used to store the client_redirect_url, which is used to
+ complete the SSO login flow.
+
+ Args:
+ state: The ``state`` parameter passed to the OIDC provider.
+ session_data: data to include in the session token.
+ duration_in_ms: An optional duration for the token in milliseconds.
+ Defaults to an hour.
+
+ Returns:
+ A signed macaroon token with the session information.
+ """
+ macaroon = pymacaroons.Macaroon(
+ location=self._server_name, identifier="key", key=self._macaroon_secret_key,
+ )
+ macaroon.add_first_party_caveat("gen = 1")
+ macaroon.add_first_party_caveat("type = session")
+ macaroon.add_first_party_caveat("state = %s" % (state,))
+ macaroon.add_first_party_caveat("nonce = %s" % (session_data.nonce,))
+ macaroon.add_first_party_caveat(
+ "client_redirect_url = %s" % (session_data.client_redirect_url,)
+ )
+ if session_data.ui_auth_session_id:
+ macaroon.add_first_party_caveat(
+ "ui_auth_session_id = %s" % (session_data.ui_auth_session_id,)
+ )
+ now = self._clock.time_msec()
+ expiry = now + duration_in_ms
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+
+ return macaroon.serialize()
+
+ def verify_oidc_session_token(
+ self, session: bytes, state: str
+ ) -> "OidcSessionData":
+ """Verifies and extract an OIDC session token.
+
+ This verifies that a given session token was issued by this homeserver
+ and extract the nonce and client_redirect_url caveats.
+
+ Args:
+ session: The session token to verify
+ state: The state the OIDC provider gave back
+
+ Returns:
+ The data extracted from the session cookie
+ """
+ macaroon = pymacaroons.Macaroon.deserialize(session)
+
+ v = pymacaroons.Verifier()
+ v.satisfy_exact("gen = 1")
+ v.satisfy_exact("type = session")
+ v.satisfy_exact("state = %s" % (state,))
+ v.satisfy_general(lambda c: c.startswith("nonce = "))
+ v.satisfy_general(lambda c: c.startswith("client_redirect_url = "))
+ # Sometimes there's a UI auth session ID, it seems to be OK to attempt
+ # to always satisfy this.
+ v.satisfy_general(lambda c: c.startswith("ui_auth_session_id = "))
+ v.satisfy_general(self._verify_expiry)
+
+ v.verify(macaroon, self._macaroon_secret_key)
+
+ # Extract the `nonce`, `client_redirect_url`, and maybe the
+ # `ui_auth_session_id` from the token.
+ nonce = self._get_value_from_macaroon(macaroon, "nonce")
+ client_redirect_url = self._get_value_from_macaroon(
+ macaroon, "client_redirect_url"
+ )
+ try:
+ ui_auth_session_id = self._get_value_from_macaroon(
+ macaroon, "ui_auth_session_id"
+ ) # type: Optional[str]
+ except ValueError:
+ ui_auth_session_id = None
+
+ return OidcSessionData(
+ nonce=nonce,
+ client_redirect_url=client_redirect_url,
+ ui_auth_session_id=ui_auth_session_id,
+ )
+
+ def _get_value_from_macaroon(self, macaroon: pymacaroons.Macaroon, key: str) -> str:
+ """Extracts a caveat value from a macaroon token.
+
+ Args:
+ macaroon: the token
+ key: the key of the caveat to extract
+
+ Returns:
+ The extracted value
+
+ Raises:
+ Exception: if the caveat was not in the macaroon
+ """
+ prefix = key + " = "
+ for caveat in macaroon.caveats:
+ if caveat.caveat_id.startswith(prefix):
+ return caveat.caveat_id[len(prefix) :]
+ raise ValueError("No %s caveat in macaroon" % (key,))
+
+ def _verify_expiry(self, caveat: str) -> bool:
+ prefix = "time < "
+ if not caveat.startswith(prefix):
+ return False
+ expiry = int(caveat[len(prefix) :])
+ now = self._clock.time_msec()
+ return now < expiry
+
+
+@attr.s(frozen=True, slots=True)
+class OidcSessionData:
+ """The attributes which are stored in a OIDC session cookie"""
+
+ # The `nonce` parameter passed to the OIDC provider.
+ nonce = attr.ib(type=str)
+
+ # The URL the client gave when it initiated the flow. ("" if this is a UI Auth)
+ client_redirect_url = attr.ib(type=str)
+
+ # The session ID of the ongoing UI Auth (None if this is a login)
+ ui_auth_session_id = attr.ib(type=Optional[str], default=None)
+
+
UserAttributeDict = TypedDict(
"UserAttributeDict", {"localpart": Optional[str], "display_name": Optional[str]}
)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 5131a0803e..4b102ff9a9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -432,9 +432,17 @@ class ProfileHandler(BaseHandler):
400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN,)
)
+ avatar_url_to_set = new_avatar_url # type: Optional[str]
+ if new_avatar_url == "":
+ avatar_url_to_set = None
+
# Enforce a max avatar size if one is defined
- if self.max_avatar_size or self.allowed_avatar_mimetypes:
- media_id = self._validate_and_parse_media_id_from_avatar_url(new_avatar_url)
+ if avatar_url_to_set and (
+ self.max_avatar_size or self.allowed_avatar_mimetypes
+ ):
+ media_id = self._validate_and_parse_media_id_from_avatar_url(
+ avatar_url_to_set
+ )
# Check that this media exists locally
media_info = await self.store.get_local_media(media_id)
@@ -477,7 +485,7 @@ class ProfileHandler(BaseHandler):
new_batchnum = None
await self.store.set_profile_avatar_url(
- target_user.localpart, new_avatar_url, new_batchnum
+ target_user.localpart, avatar_url_to_set, new_batchnum
)
if self.hs.config.user_directory_search_all_users:
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 2da1ea2223..d096e0b091 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -23,6 +23,7 @@ from typing_extensions import NoReturn, Protocol
from twisted.web.http import Request
from synapse.api.errors import Codes, RedirectException, SynapseError
+from synapse.http import get_request_user_agent
from synapse.http.server import respond_with_html
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters
@@ -166,6 +167,37 @@ class SsoHandler:
"""Get the configured identity providers"""
return self._identity_providers
+ async def get_identity_providers_for_user(
+ self, user_id: str
+ ) -> Mapping[str, SsoIdentityProvider]:
+ """Get the SsoIdentityProviders which a user has used
+
+ Given a user id, get the identity providers that that user has used to log in
+ with in the past (and thus could use to re-identify themselves for UI Auth).
+
+ Args:
+ user_id: MXID of user to look up
+
+ Raises:
+ a map of idp_id to SsoIdentityProvider
+ """
+ external_ids = await self._store.get_external_ids_by_user(user_id)
+
+ valid_idps = {}
+ for idp_id, _ in external_ids:
+ idp = self._identity_providers.get(idp_id)
+ if not idp:
+ logger.warning(
+ "User %r has an SSO mapping for IdP %r, but this is no longer "
+ "configured.",
+ user_id,
+ idp_id,
+ )
+ else:
+ valid_idps[idp_id] = idp
+
+ return valid_idps
+
def render_error(
self,
request: Request,
@@ -362,7 +394,7 @@ class SsoHandler:
attributes,
auth_provider_id,
remote_user_id,
- request.get_user_agent(""),
+ get_request_user_agent(request),
request.getClientIP(),
)
@@ -628,7 +660,7 @@ class SsoHandler:
attributes,
session.auth_provider_id,
session.remote_user_id,
- request.get_user_agent(""),
+ get_request_user_agent(request),
request.getClientIP(),
)
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index 824f37f8f8..a68d5e790e 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -20,3 +20,18 @@ TODO: move more stuff out of AuthHandler in here.
"""
from synapse.handlers.ui_auth.checkers import INTERACTIVE_AUTH_CHECKERS # noqa: F401
+
+
+class UIAuthSessionDataConstants:
+ """Constants for use with AuthHandler.set_session_data"""
+
+ # used during registration and password reset to store a hashed copy of the
+ # password, so that the client does not need to submit it each time.
+ PASSWORD_HASH = "password_hash"
+
+ # used during registration to store the mxid of the registered user
+ REGISTERED_USER_ID = "registered_user_id"
+
+ # used by validate_user_via_ui_auth to store the mxid of the user we are validating
+ # for.
+ REQUEST_USER_ID = "request_user_id"
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 59b01b812c..4bc3cb53f0 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -17,6 +17,7 @@ import re
from twisted.internet import task
from twisted.web.client import FileBodyProducer
+from twisted.web.iweb import IRequest
from synapse.api.errors import SynapseError
@@ -50,3 +51,17 @@ class QuieterFileBodyProducer(FileBodyProducer):
FileBodyProducer.stopProducing(self)
except task.TaskStopped:
pass
+
+
+def get_request_user_agent(request: IRequest, default: str = "") -> str:
+ """Return the last User-Agent header, or the given default.
+ """
+ # There could be raw utf-8 bytes in the User-Agent header.
+
+ # N.B. if you don't do this, the logger explodes cryptically
+ # with maximum recursion trying to log errors about
+ # the charset problem.
+ # c.f. https://github.com/matrix-org/synapse/issues/3471
+
+ h = request.getHeader(b"User-Agent")
+ return h.decode("ascii", "replace") if h else default
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b261e078c4..b7103d6541 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -174,6 +174,16 @@ async def _handle_json_response(
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = await make_deferred_yieldable(d)
+ except ValueError as e:
+ # The JSON content was invalid.
+ logger.warning(
+ "{%s} [%s] Failed to parse JSON response - %s %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ request.uri.decode("ascii"),
+ )
+ raise RequestSendFailed(e, can_retry=False) from e
except defer.TimeoutError as e:
logger.warning(
"{%s} [%s] Timed out reading response - %s %s",
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5a5790831b..12ec3f851f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -20,7 +20,7 @@ from twisted.python.failure import Failure
from twisted.web.server import Request, Site
from synapse.config.server import ListenerConfig
-from synapse.http import redact_uri
+from synapse.http import get_request_user_agent, redact_uri
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.types import Requester
@@ -113,15 +113,6 @@ class SynapseRequest(Request):
method = self.method.decode("ascii")
return method
- def get_user_agent(self, default: str) -> str:
- """Return the last User-Agent header, or the given default.
- """
- user_agent = self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
- if user_agent is None:
- return default
-
- return user_agent.decode("ascii", "replace")
-
def render(self, resrc):
# this is called once a Resource has been found to serve the request; in our
# case the Resource in question will normally be a JsonResource.
@@ -292,12 +283,7 @@ class SynapseRequest(Request):
# and can see that we're doing something wrong.
authenticated_entity = repr(self.requester) # type: ignore[unreachable]
- # ...or could be raw utf-8 bytes in the User-Agent header.
- # N.B. if you don't do this, the logger explodes cryptically
- # with maximum recursion trying to log errors about
- # the charset problem.
- # c.f. https://github.com/matrix-org/synapse/issues/3471
- user_agent = self.get_user_agent("-")
+ user_agent = get_request_user_agent(self, "-")
code = str(self.code)
if not self.finished:
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 6658c2da56..f39e3d6d5c 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -244,7 +244,7 @@ class UserRestServletV2(RestServlet):
if deactivate and not user["deactivated"]:
await self.deactivate_account_handler.deactivate_account(
- target_user.to_string(), False
+ target_user.to_string(), False, requester, by_admin=True
)
elif not deactivate and user["deactivated"]:
if "password" not in body:
@@ -486,12 +486,22 @@ class WhoisRestServlet(RestServlet):
class DeactivateAccountRestServlet(RestServlet):
PATTERNS = admin_patterns("/deactivate/(?P<target_user_id>[^/]*)")
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self._deactivate_account_handler = hs.get_deactivate_account_handler()
self.auth = hs.get_auth()
+ self.is_mine = hs.is_mine
+ self.store = hs.get_datastore()
+
+ async def on_POST(self, request: str, target_user_id: str) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request)
+ await assert_user_is_admin(self.auth, requester.user)
+
+ if not self.is_mine(UserID.from_string(target_user_id)):
+ raise SynapseError(400, "Can only deactivate local users")
+
+ if not await self.store.get_user_by_id(target_user_id):
+ raise NotFoundError("User not found")
- async def on_POST(self, request, target_user_id):
- await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
@@ -501,10 +511,8 @@ class DeactivateAccountRestServlet(RestServlet):
Codes.BAD_JSON,
)
- UserID.from_string(target_user_id)
-
result = await self._deactivate_account_handler.deactivate_account(
- target_user_id, erase
+ target_user_id, erase, requester, by_admin=True
)
if result:
id_server_unbind_result = "success"
@@ -714,13 +722,6 @@ class UserMembershipRestServlet(RestServlet):
async def on_GET(self, request, user_id):
await assert_requester_is_admin(self.auth, request)
- if not self.is_mine(UserID.from_string(user_id)):
- raise SynapseError(400, "Can only lookup local users")
-
- user = await self.store.get_user_by_id(user_id)
- if user is None:
- raise NotFoundError("Unknown user")
-
room_ids = await self.store.get_rooms_for_user(user_id)
ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
return 200, ret
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 6f6045d071..7fdfce78ed 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -21,9 +21,6 @@ from http import HTTPStatus
from typing import TYPE_CHECKING
from urllib.parse import urlparse
-if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
-
from synapse.api.constants import LoginType
from synapse.api.errors import (
Codes,
@@ -32,6 +29,7 @@ from synapse.api.errors import (
ThreepidValidationError,
)
from synapse.config.emailconfig import ThreepidBehaviour
+from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http.server import finish_request, respond_with_html
from synapse.http.servlet import (
RestServlet,
@@ -48,6 +46,10 @@ from synapse.util.threepids import canonicalise_email, check_3pid_allowed
from ._base import client_patterns, interactive_auth_handler
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
+
logger = logging.getLogger(__name__)
@@ -199,11 +201,7 @@ class PasswordRestServlet(RestServlet):
params,
session_id,
) = await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "modify your account password",
+ requester, request, body, "modify your account password",
)
except InteractiveAuthIncompleteError as e:
# The user needs to provide more steps to complete auth, but
@@ -225,7 +223,6 @@ class PasswordRestServlet(RestServlet):
[[LoginType.EMAIL_IDENTITY]],
request,
body,
- self.hs.get_ip_from_request(request),
"modify your account password",
)
except InteractiveAuthIncompleteError as e:
@@ -237,7 +234,9 @@ class PasswordRestServlet(RestServlet):
if new_password:
password_hash = await self.auth_handler.hash(new_password)
await self.auth_handler.set_session_data(
- e.session_id, "password_hash", password_hash
+ e.session_id,
+ UIAuthSessionDataConstants.PASSWORD_HASH,
+ password_hash,
)
raise
@@ -270,7 +269,7 @@ class PasswordRestServlet(RestServlet):
password_hash = await self.auth_handler.hash(new_password)
elif session_id is not None:
password_hash = await self.auth_handler.get_session_data(
- session_id, "password_hash", None
+ session_id, UIAuthSessionDataConstants.PASSWORD_HASH, None
)
else:
# UI validation was skipped, but the request did not include a new
@@ -334,19 +333,18 @@ class DeactivateAccountRestServlet(RestServlet):
# allow ASes to deactivate their own users
if requester.app_service:
await self._deactivate_account_handler.deactivate_account(
- requester.user.to_string(), erase
+ requester.user.to_string(), erase, requester
)
return 200, {}
await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "deactivate your account",
+ requester, request, body, "deactivate your account",
)
result = await self._deactivate_account_handler.deactivate_account(
- requester.user.to_string(), erase, id_server=body.get("id_server")
+ requester.user.to_string(),
+ erase,
+ requester,
+ id_server=body.get("id_server"),
)
if result:
id_server_unbind_result = "success"
@@ -773,11 +771,7 @@ class ThreepidAddRestServlet(RestServlet):
assert_valid_client_secret(client_secret)
await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "add a third-party identifier to your account",
+ requester, request, body, "add a third-party identifier to your account",
)
validation_session = await self.identity_handler.validate_threepid_session(
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 9b9514632f..75ece1c911 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -19,7 +19,6 @@ from typing import TYPE_CHECKING
from synapse.api.constants import LoginType
from synapse.api.errors import SynapseError
from synapse.api.urls import CLIENT_API_PREFIX
-from synapse.handlers.sso import SsoIdentityProvider
from synapse.http.server import respond_with_html
from synapse.http.servlet import RestServlet, parse_string
@@ -46,22 +45,6 @@ class AuthRestServlet(RestServlet):
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
self.registration_handler = hs.get_registration_handler()
-
- # SSO configuration.
- self._cas_enabled = hs.config.cas_enabled
- if self._cas_enabled:
- self._cas_handler = hs.get_cas_handler()
- self._cas_server_url = hs.config.cas_server_url
- self._cas_service_url = hs.config.cas_service_url
- self._saml_enabled = hs.config.saml2_enabled
- if self._saml_enabled:
- self._saml_handler = hs.get_saml_handler()
- self._oidc_enabled = hs.config.oidc_enabled
- if self._oidc_enabled:
- self._oidc_handler = hs.get_oidc_handler()
- self._cas_server_url = hs.config.cas_server_url
- self._cas_service_url = hs.config.cas_service_url
-
self.recaptcha_template = hs.config.recaptcha_template
self.terms_template = hs.config.terms_template
self.success_template = hs.config.fallback_success_template
@@ -90,21 +73,7 @@ class AuthRestServlet(RestServlet):
elif stagetype == LoginType.SSO:
# Display a confirmation page which prompts the user to
# re-authenticate with their SSO provider.
-
- if self._cas_enabled:
- sso_auth_provider = self._cas_handler # type: SsoIdentityProvider
- elif self._saml_enabled:
- sso_auth_provider = self._saml_handler
- elif self._oidc_enabled:
- sso_auth_provider = self._oidc_handler
- else:
- raise SynapseError(400, "Homeserver not configured for SSO.")
-
- sso_redirect_url = await sso_auth_provider.handle_redirect_request(
- request, None, session
- )
-
- html = await self.auth_handler.start_sso_ui_auth(sso_redirect_url, session)
+ html = await self.auth_handler.start_sso_ui_auth(request, session)
else:
raise SynapseError(404, "Unknown auth stage type")
@@ -128,7 +97,7 @@ class AuthRestServlet(RestServlet):
authdict = {"response": response, "session": session}
success = await self.auth_handler.add_oob_auth(
- LoginType.RECAPTCHA, authdict, self.hs.get_ip_from_request(request)
+ LoginType.RECAPTCHA, authdict, request.getClientIP()
)
if success:
@@ -144,7 +113,7 @@ class AuthRestServlet(RestServlet):
authdict = {"session": session}
success = await self.auth_handler.add_oob_auth(
- LoginType.TERMS, authdict, self.hs.get_ip_from_request(request)
+ LoginType.TERMS, authdict, request.getClientIP()
)
if success:
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index af117cb27c..314e01dfe4 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -83,11 +83,7 @@ class DeleteDevicesRestServlet(RestServlet):
assert_params_in_dict(body, ["devices"])
await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "remove device(s) from your account",
+ requester, request, body, "remove device(s) from your account",
)
await self.device_handler.delete_devices(
@@ -133,11 +129,7 @@ class DeviceRestServlet(RestServlet):
raise
await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "remove a device from your account",
+ requester, request, body, "remove a device from your account",
)
await self.device_handler.delete_device(requester.user.to_string(), device_id)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index b91996c738..a6134ead8a 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -271,11 +271,7 @@ class SigningKeyUploadServlet(RestServlet):
body = parse_json_object_from_request(request)
await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "add a device signing key to your account",
+ requester, request, body, "add a device signing key to your account",
)
result = await self.e2e_keys_handler.upload_signing_keys_for_user(user_id, body)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index e640dadf06..ad33209bb9 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -40,6 +40,7 @@ from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.config.registration import RegistrationConfig
from synapse.config.server import is_threepid_reserved
from synapse.handlers.auth import AuthHandler
+from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http.server import finish_request, respond_with_html
from synapse.http.servlet import (
RestServlet,
@@ -490,22 +491,18 @@ class RegisterRestServlet(RestServlet):
# user here. We carry on and go through the auth checks though,
# for paranoia.
registered_user_id = await self.auth_handler.get_session_data(
- session_id, "registered_user_id", None
+ session_id, UIAuthSessionDataConstants.REGISTERED_USER_ID, None
)
# Extract the previously-hashed password from the session.
password_hash = await self.auth_handler.get_session_data(
- session_id, "password_hash", None
+ session_id, UIAuthSessionDataConstants.PASSWORD_HASH, None
)
# Check if the user-interactive authentication flows are complete, if
# not this will raise a user-interactive auth error.
try:
auth_result, params, session_id = await self.auth_handler.check_ui_auth(
- self._registration_flows,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "register a new account",
+ self._registration_flows, request, body, "register a new account",
)
except InteractiveAuthIncompleteError as e:
# The user needs to provide more steps to complete auth.
@@ -520,7 +517,9 @@ class RegisterRestServlet(RestServlet):
if not password_hash and password:
password_hash = await self.auth_handler.hash(password)
await self.auth_handler.set_session_data(
- e.session_id, "password_hash", password_hash
+ e.session_id,
+ UIAuthSessionDataConstants.PASSWORD_HASH,
+ password_hash,
)
raise
@@ -709,7 +708,9 @@ class RegisterRestServlet(RestServlet):
# Remember that the user account has been registered (and the user
# ID it was registered with, since it might not have been specified).
await self.auth_handler.set_session_data(
- session_id, "registered_user_id", registered_user_id
+ session_id,
+ UIAuthSessionDataConstants.REGISTERED_USER_ID,
+ registered_user_id,
)
registered = True
diff --git a/synapse/server.py b/synapse/server.py
index bd8ee0e34a..aefb6159d8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -282,10 +282,6 @@ class HomeServer(metaclass=abc.ABCMeta):
"""
return self._reactor
- def get_ip_from_request(self, request) -> str:
- # X-Forwarded-For is handled by our custom request type.
- return request.getClientIP()
-
def is_mine(self, domain_specific_string: DomainSpecificString) -> bool:
return domain_specific_string.domain == self.hostname
@@ -499,7 +495,7 @@ class HomeServer(metaclass=abc.ABCMeta):
return InitialSyncHandler(self)
@cache_in_self
- def get_profile_handler(self):
+ def get_profile_handler(self) -> ProfileHandler:
return ProfileHandler(self)
@cache_in_self
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b70ca3087b..6cfadc2b4e 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -179,6 +179,9 @@ class LoggingDatabaseConnection:
_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
+R = TypeVar("R")
+
+
class LoggingTransaction:
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
@@ -266,6 +269,20 @@ class LoggingTransaction:
for val in args:
self.execute(sql, val)
+ def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
+ """Corresponds to psycopg2.extras.execute_values. Only available when
+ using postgres.
+
+ Always sets fetch=True when caling `execute_values`, so will return the
+ results.
+ """
+ assert isinstance(self.database_engine, PostgresEngine)
+ from psycopg2.extras import execute_values # type: ignore
+
+ return self._do_execute(
+ lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args
+ )
+
def execute(self, sql: str, *args: Any) -> None:
self._do_execute(self.txn.execute, sql, *args)
@@ -276,7 +293,7 @@ class LoggingTransaction:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
return " ".join(line.strip() for line in sql.splitlines() if line.strip())
- def _do_execute(self, func, sql: str, *args: Any) -> None:
+ def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
sql = self._make_sql_one_line(sql)
# TODO(paul): Maybe use 'info' and 'debug' for values?
@@ -347,9 +364,6 @@ class PerformanceCounters:
return top_n_counters
-R = TypeVar("R")
-
-
class DatabasePool:
"""Wraps a single physical database and connection pool.
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index bff51e92b9..bad8260892 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -312,12 +312,9 @@ class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
db_conn,
- "account_data_max_stream_id",
+ "room_account_data",
"stream_id",
- extra_tables=[
- ("room_account_data", "stream_id"),
- ("room_tags_revisions", "stream_id"),
- ],
+ extra_tables=[("room_tags_revisions", "stream_id")],
)
super().__init__(database, db_conn, hs)
@@ -362,14 +359,6 @@ class AccountDataStore(AccountDataWorkerStore):
lock=False,
)
- # it's theoretically possible for the above to succeed and the
- # below to fail - in which case we might reuse a stream id on
- # restart, and the above update might not get propagated. That
- # doesn't sound any worse than the whole update getting lost,
- # which is what would happen if we combined the two into one
- # transaction.
- await self._update_max_stream_id(next_id)
-
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
@@ -402,18 +391,6 @@ class AccountDataStore(AccountDataWorkerStore):
content,
)
- # it's theoretically possible for the above to succeed and the
- # below to fail - in which case we might reuse a stream id on
- # restart, and the above update might not get propagated. That
- # doesn't sound any worse than the whole update getting lost,
- # which is what would happen if we combined the two into one
- # transaction.
- #
- # Note: This is only here for backwards compat to allow admins to
- # roll back to a previous Synapse version. Next time we update the
- # database version we can remove this table.
- await self._update_max_stream_id(next_id)
-
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
@@ -486,24 +463,3 @@ class AccountDataStore(AccountDataWorkerStore):
# Invalidate the cache for any ignored users which were added or removed.
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
-
- async def _update_max_stream_id(self, next_id: int) -> None:
- """Update the max stream_id
-
- Args:
- next_id: The the revision to advance to.
- """
-
- # Note: This is only here for backwards compat to allow admins to
- # roll back to a previous Synapse version. Next time we update the
- # database version we can remove this table.
-
- def _update(txn):
- update_max_id_sql = (
- "UPDATE account_data_max_stream_id"
- " SET stream_id = ?"
- " WHERE stream_id < ?"
- )
- txn.execute(update_max_id_sql, (next_id, next_id))
-
- await self.db_pool.runInteraction("update_account_data_max_stream_id", _update)
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4d1b92d1aa..1b6ccd51c8 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -707,50 +707,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
"""Get the current stream id from the _device_list_id_gen"""
...
-
-class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
- async def set_e2e_device_keys(
- self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
- ) -> bool:
- """Stores device keys for a device. Returns whether there was a change
- or the keys were already in the database.
- """
-
- def _set_e2e_device_keys_txn(txn):
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("time_now", time_now)
- set_tag("device_keys", device_keys)
-
- old_key_json = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="e2e_device_keys_json",
- keyvalues={"user_id": user_id, "device_id": device_id},
- retcol="key_json",
- allow_none=True,
- )
-
- # In py3 we need old_key_json to match new_key_json type. The DB
- # returns unicode while encode_canonical_json returns bytes.
- new_key_json = encode_canonical_json(device_keys).decode("utf-8")
-
- if old_key_json == new_key_json:
- log_kv({"Message": "Device key already stored."})
- return False
-
- self.db_pool.simple_upsert_txn(
- txn,
- table="e2e_device_keys_json",
- keyvalues={"user_id": user_id, "device_id": device_id},
- values={"ts_added_ms": time_now, "key_json": new_key_json},
- )
- log_kv({"message": "Device keys stored."})
- return True
-
- return await self.db_pool.runInteraction(
- "set_e2e_device_keys", _set_e2e_device_keys_txn
- )
-
async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Dict[str, Dict[str, Dict[str, bytes]]]:
@@ -840,6 +796,50 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
+
+class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
+ async def set_e2e_device_keys(
+ self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
+ ) -> bool:
+ """Stores device keys for a device. Returns whether there was a change
+ or the keys were already in the database.
+ """
+
+ def _set_e2e_device_keys_txn(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("time_now", time_now)
+ set_tag("device_keys", device_keys)
+
+ old_key_json = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="e2e_device_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ retcol="key_json",
+ allow_none=True,
+ )
+
+ # In py3 we need old_key_json to match new_key_json type. The DB
+ # returns unicode while encode_canonical_json returns bytes.
+ new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
+ if old_key_json == new_key_json:
+ log_kv({"Message": "Device key already stored."})
+ return False
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="e2e_device_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ values={"ts_added_ms": time_now, "key_json": new_key_json},
+ )
+ log_kv({"message": "Device keys stored."})
+ return True
+
+ return await self.db_pool.runInteraction(
+ "set_e2e_device_keys", _set_e2e_device_keys_txn
+ )
+
async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
def delete_e2e_keys_by_device_txn(txn):
log_kv(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ebffd89251..8326640d20 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -24,6 +24,8 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.types import Cursor
from synapse.types import Collection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@@ -32,6 +34,11 @@ from synapse.util.iterutils import batch_iter
logger = logging.getLogger(__name__)
+class _NoChainCoverIndex(Exception):
+ def __init__(self, room_id: str):
+ super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
+
+
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@@ -151,15 +158,193 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
The set of the difference in auth chains.
"""
+ # Check if we have indexed the room so we can use the chain cover
+ # algorithm.
+ room = await self.get_room(room_id)
+ if room["has_auth_chain_index"]:
+ try:
+ return await self.db_pool.runInteraction(
+ "get_auth_chain_difference_chains",
+ self._get_auth_chain_difference_using_cover_index_txn,
+ room_id,
+ state_sets,
+ )
+ except _NoChainCoverIndex:
+ # For whatever reason we don't actually have a chain cover index
+ # for the events in question, so we fall back to the old method.
+ pass
+
return await self.db_pool.runInteraction(
"get_auth_chain_difference",
self._get_auth_chain_difference_txn,
state_sets,
)
+ def _get_auth_chain_difference_using_cover_index_txn(
+ self, txn: Cursor, room_id: str, state_sets: List[Set[str]]
+ ) -> Set[str]:
+ """Calculates the auth chain difference using the chain index.
+
+ See docs/auth_chain_difference_algorithm.md for details
+ """
+
+ # First we look up the chain ID/sequence numbers for all the events, and
+ # work out the chain/sequence numbers reachable from each state set.
+
+ initial_events = set(state_sets[0]).union(*state_sets[1:])
+
+ # Map from event_id -> (chain ID, seq no)
+ chain_info = {} # type: Dict[str, Tuple[int, int]]
+
+ # Map from chain ID -> seq no -> event Id
+ chain_to_event = {} # type: Dict[int, Dict[int, str]]
+
+ # All the chains that we've found that are reachable from the state
+ # sets.
+ seen_chains = set() # type: Set[int]
+
+ sql = """
+ SELECT event_id, chain_id, sequence_number
+ FROM event_auth_chains
+ WHERE %s
+ """
+ for batch in batch_iter(initial_events, 1000):
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "event_id", batch
+ )
+ txn.execute(sql % (clause,), args)
+
+ for event_id, chain_id, sequence_number in txn:
+ chain_info[event_id] = (chain_id, sequence_number)
+ seen_chains.add(chain_id)
+ chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
+
+ # Check that we actually have a chain ID for all the events.
+ events_missing_chain_info = initial_events.difference(chain_info)
+ if events_missing_chain_info:
+ # This can happen due to e.g. downgrade/upgrade of the server. We
+ # raise an exception and fall back to the previous algorithm.
+ logger.info(
+ "Unexpectedly found that events don't have chain IDs in room %s: %s",
+ room_id,
+ events_missing_chain_info,
+ )
+ raise _NoChainCoverIndex(room_id)
+
+ # Corresponds to `state_sets`, except as a map from chain ID to max
+ # sequence number reachable from the state set.
+ set_to_chain = [] # type: List[Dict[int, int]]
+ for state_set in state_sets:
+ chains = {} # type: Dict[int, int]
+ set_to_chain.append(chains)
+
+ for event_id in state_set:
+ chain_id, seq_no = chain_info[event_id]
+
+ chains[chain_id] = max(seq_no, chains.get(chain_id, 0))
+
+ # Now we look up all links for the chains we have, adding chains to
+ # set_to_chain that are reachable from each set.
+ sql = """
+ SELECT
+ origin_chain_id, origin_sequence_number,
+ target_chain_id, target_sequence_number
+ FROM event_auth_chain_links
+ WHERE %s
+ """
+
+ # (We need to take a copy of `seen_chains` as we want to mutate it in
+ # the loop)
+ for batch in batch_iter(set(seen_chains), 1000):
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "origin_chain_id", batch
+ )
+ txn.execute(sql % (clause,), args)
+
+ for (
+ origin_chain_id,
+ origin_sequence_number,
+ target_chain_id,
+ target_sequence_number,
+ ) in txn:
+ for chains in set_to_chain:
+ # chains are only reachable if the origin sequence number of
+ # the link is less than the max sequence number in the
+ # origin chain.
+ if origin_sequence_number <= chains.get(origin_chain_id, 0):
+ chains[target_chain_id] = max(
+ target_sequence_number, chains.get(target_chain_id, 0),
+ )
+
+ seen_chains.add(target_chain_id)
+
+ # Now for each chain we figure out the maximum sequence number reachable
+ # from *any* state set and the minimum sequence number reachable from
+ # *all* state sets. Events in that range are in the auth chain
+ # difference.
+ result = set()
+
+ # Mapping from chain ID to the range of sequence numbers that should be
+ # pulled from the database.
+ chain_to_gap = {} # type: Dict[int, Tuple[int, int]]
+
+ for chain_id in seen_chains:
+ min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
+ max_seq_no = max(chains.get(chain_id, 0) for chains in set_to_chain)
+
+ if min_seq_no < max_seq_no:
+ # We have a non empty gap, try and fill it from the events that
+ # we have, otherwise add them to the list of gaps to pull out
+ # from the DB.
+ for seq_no in range(min_seq_no + 1, max_seq_no + 1):
+ event_id = chain_to_event.get(chain_id, {}).get(seq_no)
+ if event_id:
+ result.add(event_id)
+ else:
+ chain_to_gap[chain_id] = (min_seq_no, max_seq_no)
+ break
+
+ if not chain_to_gap:
+ # If there are no gaps to fetch, we're done!
+ return result
+
+ if isinstance(self.database_engine, PostgresEngine):
+ # We can use `execute_values` to efficiently fetch the gaps when
+ # using postgres.
+ sql = """
+ SELECT event_id
+ FROM event_auth_chains AS c, (VALUES ?) AS l(chain_id, min_seq, max_seq)
+ WHERE
+ c.chain_id = l.chain_id
+ AND min_seq < sequence_number AND sequence_number <= max_seq
+ """
+
+ args = [
+ (chain_id, min_no, max_no)
+ for chain_id, (min_no, max_no) in chain_to_gap.items()
+ ]
+
+ rows = txn.execute_values(sql, args)
+ result.update(r for r, in rows)
+ else:
+ # For SQLite we just fall back to doing a noddy for loop.
+ sql = """
+ SELECT event_id FROM event_auth_chains
+ WHERE chain_id = ? AND ? < sequence_number AND sequence_number <= ?
+ """
+ for chain_id, (min_no, max_no) in chain_to_gap.items():
+ txn.execute(sql, (chain_id, min_no, max_no))
+ result.update(r for r, in txn)
+
+ return result
+
def _get_auth_chain_difference_txn(
self, txn, state_sets: List[Set[str]]
) -> Set[str]:
+ """Calculates the auth chain difference using a breadth first search.
+
+ This is used when we don't have a cover index for the room.
+ """
# Algorithm Description
# ~~~~~~~~~~~~~~~~~~~~~
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 90fb1a1f00..186f064036 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -17,7 +17,17 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Generator,
+ Iterable,
+ List,
+ Optional,
+ Set,
+ Tuple,
+)
import attr
from prometheus_client import Counter
@@ -33,9 +43,10 @@ from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.util.id_generators import MultiWriterIdGenerator
+from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, sorted_topologically
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -89,6 +100,14 @@ class PersistEventsStore:
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
+ def get_chain_id_txn(txn):
+ txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
+ return txn.fetchone()[0]
+
+ self._event_chain_id_gen = build_sequence_generator(
+ db.engine, get_chain_id_txn, "event_auth_chain_id"
+ )
+
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id
@@ -366,6 +385,36 @@ class PersistEventsStore:
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
+ self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
+
+ # _store_rejected_events_txn filters out any events which were
+ # rejected, and returns the filtered list.
+ events_and_contexts = self._store_rejected_events_txn(
+ txn, events_and_contexts=events_and_contexts
+ )
+
+ # From this point onwards the events are only ones that weren't
+ # rejected.
+
+ self._update_metadata_tables_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
+ backfilled=backfilled,
+ )
+
+ # We call this last as it assumes we've inserted the events into
+ # room_memberships, where applicable.
+ self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+
+ def _persist_event_auth_chain_txn(
+ self, txn: LoggingTransaction, events: List[EventBase],
+ ) -> None:
+
+ # We only care about state events, so this if there are no state events.
+ if not any(e.is_state() for e in events):
+ return
+
# We want to store event_auth mappings for rejected events, as they're
# used in state res v2.
# This is only necessary if the rejected event appears in an accepted
@@ -381,31 +430,357 @@ class PersistEventsStore:
"room_id": event.room_id,
"auth_id": auth_id,
}
- for event, _ in events_and_contexts
+ for event in events
for auth_id in event.auth_event_ids()
if event.is_state()
],
)
- # _store_rejected_events_txn filters out any events which were
- # rejected, and returns the filtered list.
- events_and_contexts = self._store_rejected_events_txn(
- txn, events_and_contexts=events_and_contexts
+ # We now calculate chain ID/sequence numbers for any state events we're
+ # persisting. We ignore out of band memberships as we're not in the room
+ # and won't have their auth chain (we'll fix it up later if we join the
+ # room).
+ #
+ # See: docs/auth_chain_difference_algorithm.md
+
+ # We ignore legacy rooms that we aren't filling the chain cover index
+ # for.
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="rooms",
+ column="room_id",
+ iterable={event.room_id for event in events if event.is_state()},
+ keyvalues={},
+ retcols=("room_id", "has_auth_chain_index"),
)
+ rooms_using_chain_index = {
+ row["room_id"] for row in rows if row["has_auth_chain_index"]
+ }
- # From this point onwards the events are only ones that weren't
- # rejected.
+ state_events = {
+ event.event_id: event
+ for event in events
+ if event.is_state() and event.room_id in rooms_using_chain_index
+ }
- self._update_metadata_tables_txn(
+ if not state_events:
+ return
+
+ # Map from event ID to chain ID/sequence number.
+ chain_map = {} # type: Dict[str, Tuple[int, int]]
+
+ # We need to know the type/state_key and auth events of the events we're
+ # calculating chain IDs for. We don't rely on having the full Event
+ # instances as we'll potentially be pulling more events from the DB and
+ # we don't need the overhead of fetching/parsing the full event JSON.
+ event_to_types = {
+ e.event_id: (e.type, e.state_key) for e in state_events.values()
+ }
+ event_to_auth_chain = {
+ e.event_id: e.auth_event_ids() for e in state_events.values()
+ }
+
+ # Set of event IDs to calculate chain ID/seq numbers for.
+ events_to_calc_chain_id_for = set(state_events)
+
+ # We check if there are any events that need to be handled in the rooms
+ # we're looking at. These should just be out of band memberships, where
+ # we didn't have the auth chain when we first persisted.
+ rows = self.db_pool.simple_select_many_txn(
txn,
- events_and_contexts=events_and_contexts,
- all_events_and_contexts=all_events_and_contexts,
- backfilled=backfilled,
+ table="event_auth_chain_to_calculate",
+ keyvalues={},
+ column="room_id",
+ iterable={e.room_id for e in state_events.values()},
+ retcols=("event_id", "type", "state_key"),
)
+ for row in rows:
+ event_id = row["event_id"]
+ event_type = row["type"]
+ state_key = row["state_key"]
+
+ # (We could pull out the auth events for all rows at once using
+ # simple_select_many, but this case happens rarely and almost always
+ # with a single row.)
+ auth_events = self.db_pool.simple_select_onecol_txn(
+ txn, "event_auth", keyvalues={"event_id": event_id}, retcol="auth_id",
+ )
- # We call this last as it assumes we've inserted the events into
- # room_memberships, where applicable.
- self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+ events_to_calc_chain_id_for.add(event_id)
+ event_to_types[event_id] = (event_type, state_key)
+ event_to_auth_chain[event_id] = auth_events
+
+ # First we get the chain ID and sequence numbers for the events'
+ # auth events (that aren't also currently being persisted).
+ #
+ # Note that there there is an edge case here where we might not have
+ # calculated chains and sequence numbers for events that were "out
+ # of band". We handle this case by fetching the necessary info and
+ # adding it to the set of events to calculate chain IDs for.
+
+ missing_auth_chains = {
+ a_id
+ for auth_events in event_to_auth_chain.values()
+ for a_id in auth_events
+ if a_id not in events_to_calc_chain_id_for
+ }
+
+ # We loop here in case we find an out of band membership and need to
+ # fetch their auth event info.
+ while missing_auth_chains:
+ sql = """
+ SELECT event_id, events.type, state_key, chain_id, sequence_number
+ FROM events
+ INNER JOIN state_events USING (event_id)
+ LEFT JOIN event_auth_chains USING (event_id)
+ WHERE
+ """
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "event_id", missing_auth_chains,
+ )
+ txn.execute(sql + clause, args)
+
+ missing_auth_chains.clear()
+
+ for auth_id, event_type, state_key, chain_id, sequence_number in txn:
+ event_to_types[auth_id] = (event_type, state_key)
+
+ if chain_id is None:
+ # No chain ID, so the event was persisted out of band.
+ # We add to list of events to calculate auth chains for.
+
+ events_to_calc_chain_id_for.add(auth_id)
+
+ event_to_auth_chain[
+ auth_id
+ ] = self.db_pool.simple_select_onecol_txn(
+ txn,
+ "event_auth",
+ keyvalues={"event_id": auth_id},
+ retcol="auth_id",
+ )
+
+ missing_auth_chains.update(
+ e
+ for e in event_to_auth_chain[auth_id]
+ if e not in event_to_types
+ )
+ else:
+ chain_map[auth_id] = (chain_id, sequence_number)
+
+ # Now we check if we have any events where we don't have auth chain,
+ # this should only be out of band memberships.
+ for event_id in sorted_topologically(event_to_auth_chain, event_to_auth_chain):
+ for auth_id in event_to_auth_chain[event_id]:
+ if (
+ auth_id not in chain_map
+ and auth_id not in events_to_calc_chain_id_for
+ ):
+ events_to_calc_chain_id_for.discard(event_id)
+
+ # If this is an event we're trying to persist we add it to
+ # the list of events to calculate chain IDs for next time
+ # around. (Otherwise we will have already added it to the
+ # table).
+ event = state_events.get(event_id)
+ if event:
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="event_auth_chain_to_calculate",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ )
+
+ # We stop checking the event's auth events since we've
+ # discarded it.
+ break
+
+ if not events_to_calc_chain_id_for:
+ return
+
+ # We now calculate the chain IDs/sequence numbers for the events. We
+ # do this by looking at the chain ID and sequence number of any auth
+ # event with the same type/state_key and incrementing the sequence
+ # number by one. If there was no match or the chain ID/sequence
+ # number is already taken we generate a new chain.
+ #
+ # We need to do this in a topologically sorted order as we want to
+ # generate chain IDs/sequence numbers of an event's auth events
+ # before the event itself.
+ chains_tuples_allocated = set() # type: Set[Tuple[int, int]]
+ new_chain_tuples = {} # type: Dict[str, Tuple[int, int]]
+ for event_id in sorted_topologically(
+ events_to_calc_chain_id_for, event_to_auth_chain
+ ):
+ existing_chain_id = None
+ for auth_id in event_to_auth_chain[event_id]:
+ if event_to_types.get(event_id) == event_to_types.get(auth_id):
+ existing_chain_id = chain_map[auth_id]
+ break
+
+ new_chain_tuple = None
+ if existing_chain_id:
+ # We found a chain ID/sequence number candidate, check its
+ # not already taken.
+ proposed_new_id = existing_chain_id[0]
+ proposed_new_seq = existing_chain_id[1] + 1
+ if (proposed_new_id, proposed_new_seq) not in chains_tuples_allocated:
+ already_allocated = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_auth_chains",
+ keyvalues={
+ "chain_id": proposed_new_id,
+ "sequence_number": proposed_new_seq,
+ },
+ retcol="event_id",
+ allow_none=True,
+ )
+ if already_allocated:
+ # Mark it as already allocated so we don't need to hit
+ # the DB again.
+ chains_tuples_allocated.add((proposed_new_id, proposed_new_seq))
+ else:
+ new_chain_tuple = (
+ proposed_new_id,
+ proposed_new_seq,
+ )
+
+ if not new_chain_tuple:
+ new_chain_tuple = (self._event_chain_id_gen.get_next_id_txn(txn), 1)
+
+ chains_tuples_allocated.add(new_chain_tuple)
+
+ chain_map[event_id] = new_chain_tuple
+ new_chain_tuples[event_id] = new_chain_tuple
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="event_auth_chains",
+ values=[
+ {"event_id": event_id, "chain_id": c_id, "sequence_number": seq}
+ for event_id, (c_id, seq) in new_chain_tuples.items()
+ ],
+ )
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="event_auth_chain_to_calculate",
+ keyvalues={},
+ column="event_id",
+ iterable=new_chain_tuples,
+ )
+
+ # Now we need to calculate any new links between chains caused by
+ # the new events.
+ #
+ # Links are pairs of chain ID/sequence numbers such that for any
+ # event A (CA, SA) and any event B (CB, SB), B is in A's auth chain
+ # if and only if there is at least one link (CA, S1) -> (CB, S2)
+ # where SA >= S1 and S2 >= SB.
+ #
+ # We try and avoid adding redundant links to the table, e.g. if we
+ # have two links between two chains which both start/end at the
+ # sequence number event (or cross) then one can be safely dropped.
+ #
+ # To calculate new links we look at every new event and:
+ # 1. Fetch the chain ID/sequence numbers of its auth events,
+ # discarding any that are reachable by other auth events, or
+ # that have the same chain ID as the event.
+ # 2. For each retained auth event we:
+ # a. Add a link from the event's to the auth event's chain
+ # ID/sequence number; and
+ # b. Add a link from the event to every chain reachable by the
+ # auth event.
+
+ # Step 1, fetch all existing links from all the chains we've seen
+ # referenced.
+ chain_links = _LinkMap()
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="event_auth_chain_links",
+ column="origin_chain_id",
+ iterable={chain_id for chain_id, _ in chain_map.values()},
+ keyvalues={},
+ retcols=(
+ "origin_chain_id",
+ "origin_sequence_number",
+ "target_chain_id",
+ "target_sequence_number",
+ ),
+ )
+ for row in rows:
+ chain_links.add_link(
+ (row["origin_chain_id"], row["origin_sequence_number"]),
+ (row["target_chain_id"], row["target_sequence_number"]),
+ new=False,
+ )
+
+ # We do this in toplogical order to avoid adding redundant links.
+ for event_id in sorted_topologically(
+ events_to_calc_chain_id_for, event_to_auth_chain
+ ):
+ chain_id, sequence_number = chain_map[event_id]
+
+ # Filter out auth events that are reachable by other auth
+ # events. We do this by looking at every permutation of pairs of
+ # auth events (A, B) to check if B is reachable from A.
+ reduction = {
+ a_id
+ for a_id in event_to_auth_chain[event_id]
+ if chain_map[a_id][0] != chain_id
+ }
+ for start_auth_id, end_auth_id in itertools.permutations(
+ event_to_auth_chain[event_id], r=2,
+ ):
+ if chain_links.exists_path_from(
+ chain_map[start_auth_id], chain_map[end_auth_id]
+ ):
+ reduction.discard(end_auth_id)
+
+ # Step 2, figure out what the new links are from the reduced
+ # list of auth events.
+ for auth_id in reduction:
+ auth_chain_id, auth_sequence_number = chain_map[auth_id]
+
+ # Step 2a, add link between the event and auth event
+ chain_links.add_link(
+ (chain_id, sequence_number), (auth_chain_id, auth_sequence_number)
+ )
+
+ # Step 2b, add a link to chains reachable from the auth
+ # event.
+ for target_id, target_seq in chain_links.get_links_from(
+ (auth_chain_id, auth_sequence_number)
+ ):
+ if target_id == chain_id:
+ continue
+
+ chain_links.add_link(
+ (chain_id, sequence_number), (target_id, target_seq)
+ )
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="event_auth_chain_links",
+ values=[
+ {
+ "origin_chain_id": source_id,
+ "origin_sequence_number": source_seq,
+ "target_chain_id": target_id,
+ "target_sequence_number": target_seq,
+ }
+ for (
+ source_id,
+ source_seq,
+ target_id,
+ target_seq,
+ ) in chain_links.get_additions()
+ ],
+ )
def _persist_transaction_ids_txn(
self,
@@ -799,7 +1174,8 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _store_event_txn(self, txn, events_and_contexts):
- """Insert new events into the event and event_json tables
+ """Insert new events into the event, event_json, redaction and
+ state_events tables.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
@@ -871,6 +1247,29 @@ class PersistEventsStore:
updatevalues={"have_censored": False},
)
+ state_events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0].is_state()
+ ]
+
+ state_values = []
+ for event, context in state_events_and_contexts:
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ # TODO: How does this work with backfilling?
+ if hasattr(event, "replaces_state"):
+ vals["prev_state"] = event.replaces_state
+
+ state_values.append(vals)
+
+ self.db_pool.simple_insert_many_txn(
+ txn, table="state_events", values=state_values
+ )
+
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
@@ -987,29 +1386,6 @@ class PersistEventsStore:
txn, [event for event, _ in events_and_contexts]
)
- state_events_and_contexts = [
- ec for ec in events_and_contexts if ec[0].is_state()
- ]
-
- state_values = []
- for event, context in state_events_and_contexts:
- vals = {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- }
-
- # TODO: How does this work with backfilling?
- if hasattr(event, "replaces_state"):
- vals["prev_state"] = event.replaces_state
-
- state_values.append(vals)
-
- self.db_pool.simple_insert_many_txn(
- txn, table="state_events", values=state_values
- )
-
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
@@ -1520,3 +1896,131 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier()
],
)
+
+
+@attr.s(slots=True)
+class _LinkMap:
+ """A helper type for tracking links between chains.
+ """
+
+ # Stores the set of links as nested maps: source chain ID -> target chain ID
+ # -> source sequence number -> target sequence number.
+ maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict)
+
+ # Stores the links that have been added (with new set to true), as tuples of
+ # `(source chain ID, source sequence no, target chain ID, target sequence no.)`
+ additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set)
+
+ def add_link(
+ self,
+ src_tuple: Tuple[int, int],
+ target_tuple: Tuple[int, int],
+ new: bool = True,
+ ) -> bool:
+ """Add a new link between two chains, ensuring no redundant links are added.
+
+ New links should be added in topological order.
+
+ Args:
+ src_tuple: The chain ID/sequence number of the source of the link.
+ target_tuple: The chain ID/sequence number of the target of the link.
+ new: Whether this is a "new" link, i.e. should it be returned
+ by `get_additions`.
+
+ Returns:
+ True if a link was added, false if the given link was dropped as redundant
+ """
+ src_chain, src_seq = src_tuple
+ target_chain, target_seq = target_tuple
+
+ current_links = self.maps.setdefault(src_chain, {}).setdefault(target_chain, {})
+
+ assert src_chain != target_chain
+
+ if new:
+ # Check if the new link is redundant
+ for current_seq_src, current_seq_target in current_links.items():
+ # If a link "crosses" another link then its redundant. For example
+ # in the following link 1 (L1) is redundant, as any event reachable
+ # via L1 is *also* reachable via L2.
+ #
+ # Chain A Chain B
+ # | |
+ # L1 |------ |
+ # | | |
+ # L2 |---- | -->|
+ # | | |
+ # | |--->|
+ # | |
+ # | |
+ #
+ # So we only need to keep links which *do not* cross, i.e. links
+ # that both start and end above or below an existing link.
+ #
+ # Note, since we add links in topological ordering we should never
+ # see `src_seq` less than `current_seq_src`.
+
+ if current_seq_src <= src_seq and target_seq <= current_seq_target:
+ # This new link is redundant, nothing to do.
+ return False
+
+ self.additions.add((src_chain, src_seq, target_chain, target_seq))
+
+ current_links[src_seq] = target_seq
+ return True
+
+ def get_links_from(
+ self, src_tuple: Tuple[int, int]
+ ) -> Generator[Tuple[int, int], None, None]:
+ """Gets the chains reachable from the given chain/sequence number.
+
+ Yields:
+ The chain ID and sequence number the link points to.
+ """
+ src_chain, src_seq = src_tuple
+ for target_id, sequence_numbers in self.maps.get(src_chain, {}).items():
+ for link_src_seq, target_seq in sequence_numbers.items():
+ if link_src_seq <= src_seq:
+ yield target_id, target_seq
+
+ def get_links_between(
+ self, source_chain: int, target_chain: int
+ ) -> Generator[Tuple[int, int], None, None]:
+ """Gets the links between two chains.
+
+ Yields:
+ The source and target sequence numbers.
+ """
+
+ yield from self.maps.get(source_chain, {}).get(target_chain, {}).items()
+
+ def get_additions(self) -> Generator[Tuple[int, int, int, int], None, None]:
+ """Gets any newly added links.
+
+ Yields:
+ The source chain ID/sequence number and target chain ID/sequence number
+ """
+
+ for src_chain, src_seq, target_chain, _ in self.additions:
+ target_seq = self.maps.get(src_chain, {}).get(target_chain, {}).get(src_seq)
+ if target_seq is not None:
+ yield (src_chain, src_seq, target_chain, target_seq)
+
+ def exists_path_from(
+ self, src_tuple: Tuple[int, int], target_tuple: Tuple[int, int],
+ ) -> bool:
+ """Checks if there is a path between the source chain ID/sequence and
+ target chain ID/sequence.
+ """
+ src_chain, src_seq = src_tuple
+ target_chain, target_seq = target_tuple
+
+ if src_chain == target_chain:
+ return target_seq <= src_seq
+
+ links = self.get_links_between(src_chain, target_chain)
+ for link_start_seq, link_end_seq in links:
+ if link_start_seq <= src_seq and target_seq <= link_end_seq:
+ return True
+
+ return False
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 97b6754846..7e4b175d08 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -14,10 +14,15 @@
# limitations under the License.
import logging
+from typing import List, Tuple
from synapse.api.constants import EventContentFields
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
+from synapse.storage.types import Cursor
+from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -99,6 +104,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
columns=["user_id", "created_ts"],
)
+ self.db_pool.updates.register_background_update_handler(
+ "rejected_events_metadata", self._rejected_events_metadata,
+ )
+
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -582,3 +591,118 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
await self.db_pool.updates._end_background_update("event_store_labels")
return num_rows
+
+ async def _rejected_events_metadata(self, progress: dict, batch_size: int) -> int:
+ """Adds rejected events to the `state_events` and `event_auth` metadata
+ tables.
+ """
+
+ last_event_id = progress.get("last_event_id", "")
+
+ def get_rejected_events(
+ txn: Cursor,
+ ) -> List[Tuple[str, str, JsonDict, bool, bool]]:
+ # Fetch rejected event json, their room version and whether we have
+ # inserted them into the state_events or auth_events tables.
+ #
+ # Note we can assume that events that don't have a corresponding
+ # room version are V1 rooms.
+ sql = """
+ SELECT DISTINCT
+ event_id,
+ COALESCE(room_version, '1'),
+ json,
+ state_events.event_id IS NOT NULL,
+ event_auth.event_id IS NOT NULL
+ FROM rejections
+ INNER JOIN event_json USING (event_id)
+ LEFT JOIN rooms USING (room_id)
+ LEFT JOIN state_events USING (event_id)
+ LEFT JOIN event_auth USING (event_id)
+ WHERE event_id > ?
+ ORDER BY event_id
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_event_id, batch_size,))
+
+ return [(row[0], row[1], db_to_json(row[2]), row[3], row[4]) for row in txn] # type: ignore
+
+ results = await self.db_pool.runInteraction(
+ desc="_rejected_events_metadata_get", func=get_rejected_events
+ )
+
+ if not results:
+ await self.db_pool.updates._end_background_update(
+ "rejected_events_metadata"
+ )
+ return 0
+
+ state_events = []
+ auth_events = []
+ for event_id, room_version, event_json, has_state, has_event_auth in results:
+ last_event_id = event_id
+
+ if has_state and has_event_auth:
+ continue
+
+ room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version)
+ if not room_version_obj:
+ # We no longer support this room version, so we just ignore the
+ # events entirely.
+ logger.info(
+ "Ignoring event with unknown room version %r: %r",
+ room_version,
+ event_id,
+ )
+ continue
+
+ event = make_event_from_dict(event_json, room_version_obj)
+
+ if not event.is_state():
+ continue
+
+ if not has_state:
+ state_events.append(
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+ )
+
+ if not has_event_auth:
+ for auth_id in event.auth_event_ids():
+ auth_events.append(
+ {
+ "room_id": event.room_id,
+ "event_id": event.event_id,
+ "auth_id": auth_id,
+ }
+ )
+
+ if state_events:
+ await self.db_pool.simple_insert_many(
+ table="state_events",
+ values=state_events,
+ desc="_rejected_events_metadata_state_events",
+ )
+
+ if auth_events:
+ await self.db_pool.simple_insert_many(
+ table="event_auth",
+ values=auth_events,
+ desc="_rejected_events_metadata_event_auth",
+ )
+
+ await self.db_pool.updates._background_update_progress(
+ "rejected_events_metadata", {"last_event_id": last_event_id}
+ )
+
+ if len(results) < batch_size:
+ await self.db_pool.updates._end_background_update(
+ "rejected_events_metadata"
+ )
+
+ return len(results)
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 2570003457..4360dc0afc 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -145,7 +145,7 @@ class ProfileWorkerStore(SQLBaseStore):
)
async def set_profile_avatar_url(
- self, user_localpart: str, new_avatar_url: str, batchnum: int
+ self, user_localpart: str, new_avatar_url: Optional[str], batchnum: int
) -> None:
# Invalidate the read cache for this user
self.get_profile_avatar_url.invalidate((user_localpart,))
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 8616034e04..8c71c05c4d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -83,7 +83,7 @@ class RoomWorkerStore(SQLBaseStore):
return await self.db_pool.simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
- retcols=("room_id", "is_public", "creator"),
+ retcols=("room_id", "is_public", "creator", "has_auth_chain_index"),
desc="get_room",
allow_none=True,
)
@@ -1191,6 +1191,37 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
# It's overridden by RoomStore for the synapse master.
raise NotImplementedError()
+ async def has_auth_chain_index(self, room_id: str) -> bool:
+ """Check if the room has (or can have) a chain cover index.
+
+ Defaults to True if we don't have an entry in `rooms` table nor any
+ events for the room.
+ """
+
+ has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ retcol="has_auth_chain_index",
+ desc="has_auth_chain_index",
+ allow_none=True,
+ )
+
+ if has_auth_chain_index:
+ return True
+
+ # It's possible that we already have events for the room in our DB
+ # without a corresponding room entry. If we do then we don't want to
+ # mark the room as having an auth chain cover index.
+ max_ordering = await self.db_pool.simple_select_one_onecol(
+ table="events",
+ keyvalues={"room_id": room_id},
+ retcol="MAX(stream_ordering)",
+ allow_none=True,
+ desc="upsert_room_on_join",
+ )
+
+ return max_ordering is None
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@@ -1204,12 +1235,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
Called when we join a room over federation, and overwrites any room version
currently in the table.
"""
+ # It's possible that we already have events for the room in our DB
+ # without a corresponding room entry. If we do then we don't want to
+ # mark the room as having an auth chain cover index.
+ has_auth_chain_index = await self.has_auth_chain_index(room_id)
+
await self.db_pool.simple_upsert(
desc="upsert_room_on_join",
table="rooms",
keyvalues={"room_id": room_id},
values={"room_version": room_version.identifier},
- insertion_values={"is_public": False, "creator": ""},
+ insertion_values={
+ "is_public": False,
+ "creator": "",
+ "has_auth_chain_index": has_auth_chain_index,
+ },
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
@@ -1244,6 +1284,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
+ "has_auth_chain_index": True,
},
)
if is_public:
@@ -1272,6 +1313,11 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
When we receive an invite or any other event over federation that may relate to a room
we are not in, store the version of the room if we don't already know the room version.
"""
+ # It's possible that we already have events for the room in our DB
+ # without a corresponding room entry. If we do then we don't want to
+ # mark the room as having an auth chain cover index.
+ has_auth_chain_index = await self.has_auth_chain_index(room_id)
+
await self.db_pool.simple_upsert(
desc="maybe_store_room_on_outlier_membership",
table="rooms",
@@ -1281,6 +1327,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"room_version": room_version.identifier,
"is_public": False,
"creator": "",
+ "has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
new file mode 100644
index 0000000000..9c95646281
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (5828, 'rejected_events_metadata', '{}');
diff --git a/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql
new file mode 100644
index 0000000000..729196cfd5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql
@@ -0,0 +1,52 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- See docs/auth_chain_difference_algorithm.md
+
+CREATE TABLE event_auth_chains (
+ event_id TEXT PRIMARY KEY,
+ chain_id BIGINT NOT NULL,
+ sequence_number BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX event_auth_chains_c_seq_index ON event_auth_chains (chain_id, sequence_number);
+
+
+CREATE TABLE event_auth_chain_links (
+ origin_chain_id BIGINT NOT NULL,
+ origin_sequence_number BIGINT NOT NULL,
+
+ target_chain_id BIGINT NOT NULL,
+ target_sequence_number BIGINT NOT NULL
+);
+
+
+CREATE INDEX event_auth_chain_links_idx ON event_auth_chain_links (origin_chain_id, target_chain_id);
+
+
+-- Events that we have persisted but not calculated auth chains for,
+-- e.g. out of band memberships (where we don't have the auth chain)
+CREATE TABLE event_auth_chain_to_calculate (
+ event_id TEXT PRIMARY KEY,
+ room_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL
+);
+
+CREATE INDEX event_auth_chain_to_calculate_rm_id ON event_auth_chain_to_calculate(room_id);
+
+
+-- Whether we've calculated the above index for a room.
+ALTER TABLE rooms ADD COLUMN has_auth_chain_index BOOLEAN;
diff --git a/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres
new file mode 100644
index 0000000000..e8a035bbeb
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS event_auth_chain_id;
diff --git a/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql
new file mode 100644
index 0000000000..64ab696cfe
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is no longer used and was only kept until we bumped the schema version.
+DROP TABLE IF EXISTS account_data_max_stream_id;
diff --git a/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql
new file mode 100644
index 0000000000..fb71b360a0
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is no longer used and was only kept until we bumped the schema version.
+DROP TABLE IF EXISTS cache_invalidation_stream;
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 9f120d3cb6..74da9c49f2 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -255,16 +255,6 @@ class TagsStore(TagsWorkerStore):
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)
- # Note: This is only here for backwards compat to allow admins to
- # roll back to a previous Synapse version. Next time we update the
- # database version we can remove this table.
- update_max_id_sql = (
- "UPDATE account_data_max_stream_id"
- " SET stream_id = ?"
- " WHERE stream_id < ?"
- )
- txn.execute(update_max_id_sql, (next_id, next_id))
-
update_sql = (
"UPDATE room_tags_revisions"
" SET stream_id = ?"
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 01efb2cabb..566ea19bae 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -35,9 +35,6 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-# XXX: If you're about to bump this to 59 (or higher) please create an update
-# that drops the unused `cache_invalidation_stream` table, as per #7436!
-# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
SCHEMA_VERSION = 59
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/types.py b/synapse/types.py
index 4f3064a346..773da6022d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -38,6 +38,7 @@ from six.moves import filter
from unpaddedbase64 import decode_base64
from synapse.api.errors import Codes, SynapseError
+from synapse.http.endpoint import parse_and_validate_server_name
if TYPE_CHECKING:
from synapse.appservice.api import ApplicationService
@@ -258,8 +259,13 @@ class DomainSpecificString(
@classmethod
def is_valid(cls: Type[DS], s: str) -> bool:
+ """Parses the input string and attempts to ensure it is valid."""
try:
- cls.from_string(s)
+ obj = cls.from_string(s)
+ # Apply additional validation to the domain. This is only done
+ # during is_valid (and not part of from_string) since it is
+ # possible for invalid data to exist in room-state, etc.
+ parse_and_validate_server_name(obj.domain)
return True
except Exception:
return False
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 06faeebe7f..f7b4857a84 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -13,8 +13,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import heapq
from itertools import islice
-from typing import Iterable, Iterator, Sequence, Tuple, TypeVar
+from typing import (
+ Dict,
+ Generator,
+ Iterable,
+ Iterator,
+ Mapping,
+ Sequence,
+ Set,
+ Tuple,
+ TypeVar,
+)
+
+from synapse.types import Collection
T = TypeVar("T")
@@ -46,3 +59,41 @@ def chunk_seq(iseq: ISeq, maxlen: int) -> Iterable[ISeq]:
If the input is empty, no chunks are returned.
"""
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
+
+
+def sorted_topologically(
+ nodes: Iterable[T], graph: Mapping[T, Collection[T]],
+) -> Generator[T, None, None]:
+ """Given a set of nodes and a graph, yield the nodes in toplogical order.
+
+ For example `sorted_topologically([1, 2], {1: [2]})` will yield `2, 1`.
+ """
+
+ # This is implemented by Kahn's algorithm.
+
+ degree_map = {node: 0 for node in nodes}
+ reverse_graph = {} # type: Dict[T, Set[T]]
+
+ for node, edges in graph.items():
+ if node not in degree_map:
+ continue
+
+ for edge in edges:
+ if edge in degree_map:
+ degree_map[node] += 1
+
+ reverse_graph.setdefault(edge, set()).add(node)
+ reverse_graph.setdefault(node, set())
+
+ zero_degree = [node for node, degree in degree_map.items() if degree == 0]
+ heapq.heapify(zero_degree)
+
+ while zero_degree:
+ node = heapq.heappop(zero_degree)
+ yield node
+
+ for edge in reverse_graph[node]:
+ if edge in degree_map:
+ degree_map[edge] -= 1
+ if degree_map[edge] == 0:
+ heapq.heappush(zero_degree, edge)
diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py
index bd7a1b6891..c37bb6440e 100644
--- a/tests/handlers/test_cas.py
+++ b/tests/handlers/test_cas.py
@@ -118,4 +118,4 @@ class CasHandlerTestCase(HomeserverTestCase):
def _mock_request():
"""Returns a mock which will stand in as a SynapseRequest"""
- return Mock(spec=["getClientIP", "get_user_agent"])
+ return Mock(spec=["getClientIP", "getHeader"])
diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index f5df657814..2abd7a83b5 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -14,7 +14,7 @@
# limitations under the License.
import json
import re
-from typing import Dict
+from typing import Dict, Optional
from urllib.parse import parse_qs, urlencode, urlparse
from mock import ANY, Mock, patch
@@ -349,9 +349,13 @@ class OidcHandlerTestCase(HomeserverTestCase):
cookie = args[1]
macaroon = pymacaroons.Macaroon.deserialize(cookie)
- state = self.handler._get_value_from_macaroon(macaroon, "state")
- nonce = self.handler._get_value_from_macaroon(macaroon, "nonce")
- redirect = self.handler._get_value_from_macaroon(
+ state = self.handler._token_generator._get_value_from_macaroon(
+ macaroon, "state"
+ )
+ nonce = self.handler._token_generator._get_value_from_macaroon(
+ macaroon, "nonce"
+ )
+ redirect = self.handler._token_generator._get_value_from_macaroon(
macaroon, "client_redirect_url"
)
@@ -411,12 +415,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
client_redirect_url = "http://client/redirect"
user_agent = "Browser"
ip_address = "10.0.0.1"
- session = self.handler._generate_oidc_session_token(
- state=state,
- nonce=nonce,
- client_redirect_url=client_redirect_url,
- ui_auth_session_id=None,
- )
+ session = self._generate_oidc_session_token(state, nonce, client_redirect_url)
request = _build_callback_request(
code, state, session, user_agent=user_agent, ip_address=ip_address
)
@@ -500,11 +499,8 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.assertRenderedError("invalid_session")
# Mismatching session
- session = self.handler._generate_oidc_session_token(
- state="state",
- nonce="nonce",
- client_redirect_url="http://client/redirect",
- ui_auth_session_id=None,
+ session = self._generate_oidc_session_token(
+ state="state", nonce="nonce", client_redirect_url="http://client/redirect",
)
request.args = {}
request.args[b"state"] = [b"mismatching state"]
@@ -623,11 +619,8 @@ class OidcHandlerTestCase(HomeserverTestCase):
state = "state"
client_redirect_url = "http://client/redirect"
- session = self.handler._generate_oidc_session_token(
- state=state,
- nonce="nonce",
- client_redirect_url=client_redirect_url,
- ui_auth_session_id=None,
+ session = self._generate_oidc_session_token(
+ state=state, nonce="nonce", client_redirect_url=client_redirect_url,
)
request = _build_callback_request("code", state, session)
@@ -841,6 +834,24 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
self.assertRenderedError("mapping_error", "localpart is invalid: ")
+ def _generate_oidc_session_token(
+ self,
+ state: str,
+ nonce: str,
+ client_redirect_url: str,
+ ui_auth_session_id: Optional[str] = None,
+ ) -> str:
+ from synapse.handlers.oidc_handler import OidcSessionData
+
+ return self.handler._token_generator.generate_oidc_session_token(
+ state=state,
+ session_data=OidcSessionData(
+ nonce=nonce,
+ client_redirect_url=client_redirect_url,
+ ui_auth_session_id=ui_auth_session_id,
+ ),
+ )
+
class UsernamePickerTestCase(HomeserverTestCase):
if not HAS_OIDC:
@@ -965,17 +976,19 @@ async def _make_callback_with_userinfo(
userinfo: the OIDC userinfo dict
client_redirect_url: the URL to redirect to on success.
"""
+ from synapse.handlers.oidc_handler import OidcSessionData
+
handler = hs.get_oidc_handler()
handler._exchange_code = simple_async_mock(return_value={})
handler._parse_id_token = simple_async_mock(return_value=userinfo)
handler._fetch_userinfo = simple_async_mock(return_value=userinfo)
state = "state"
- session = handler._generate_oidc_session_token(
+ session = handler._token_generator.generate_oidc_session_token(
state=state,
- nonce="nonce",
- client_redirect_url=client_redirect_url,
- ui_auth_session_id=None,
+ session_data=OidcSessionData(
+ nonce="nonce", client_redirect_url=client_redirect_url,
+ ),
)
request = _build_callback_request("code", state, session)
@@ -1011,7 +1024,7 @@ def _build_callback_request(
"addCookie",
"requestHeaders",
"getClientIP",
- "get_user_agent",
+ "getHeader",
]
)
@@ -1020,5 +1033,4 @@ def _build_callback_request(
request.args[b"code"] = [code.encode("utf-8")]
request.args[b"state"] = [state.encode("utf-8")]
request.getClientIP.return_value = ip_address
- request.get_user_agent.return_value = user_agent
return request
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index 50955ade97..75275f0e4f 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -105,6 +105,21 @@ class ProfileTestCase(unittest.TestCase):
"Frank",
)
+ # Set displayname to an empty string
+ yield defer.ensureDeferred(
+ self.handler.set_displayname(
+ self.frank, synapse.types.create_requester(self.frank), ""
+ )
+ )
+
+ self.assertIsNone(
+ (
+ yield defer.ensureDeferred(
+ self.store.get_profile_displayname(self.frank.localpart)
+ )
+ )
+ )
+
@defer.inlineCallbacks
def test_set_my_name_if_disabled(self):
self.hs.config.enable_set_displayname = False
@@ -223,6 +238,21 @@ class ProfileTestCase(unittest.TestCase):
"http://my.server/me.png",
)
+ # Set avatar to an empty string
+ yield defer.ensureDeferred(
+ self.handler.set_avatar_url(
+ self.frank, synapse.types.create_requester(self.frank), "",
+ )
+ )
+
+ self.assertIsNone(
+ (
+ yield defer.ensureDeferred(
+ self.store.get_profile_avatar_url(self.frank.localpart)
+ )
+ ),
+ )
+
@defer.inlineCallbacks
def test_set_my_avatar_if_disabled(self):
self.hs.config.enable_set_avatar_url = False
diff --git a/tests/handlers/test_saml.py b/tests/handlers/test_saml.py
index 548038214b..261c7083d1 100644
--- a/tests/handlers/test_saml.py
+++ b/tests/handlers/test_saml.py
@@ -262,4 +262,4 @@ class SamlHandlerTestCase(HomeserverTestCase):
def _mock_request():
"""Returns a mock which will stand in as a SynapseRequest"""
- return Mock(spec=["getClientIP", "get_user_agent"])
+ return Mock(spec=["getClientIP", "getHeader"])
diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index 212484a7fe..9c52c8fdca 100644
--- a/tests/http/test_fedclient.py
+++ b/tests/http/test_fedclient.py
@@ -560,4 +560,4 @@ class FederationClientTests(HomeserverTestCase):
self.pump()
f = self.failureResultOf(test_d)
- self.assertIsInstance(f.value, ValueError)
+ self.assertIsInstance(f.value, RequestSendFailed)
diff --git a/tests/http/test_proxyagent.py b/tests/http/test_proxyagent.py
index f7575f59fb..4e1a5a5138 100644
--- a/tests/http/test_proxyagent.py
+++ b/tests/http/test_proxyagent.py
@@ -354,51 +354,6 @@ class MatrixFederationAgentTests(TestCase):
body = self.successResultOf(treq.content(resp))
self.assertEqual(body, b"result")
- @patch.dict(os.environ, {"http_proxy": "proxy.com:8888"})
- def test_http_request_via_proxy_with_blacklist(self):
- # The blacklist includes the configured proxy IP.
- agent = ProxyAgent(
- BlacklistingReactorWrapper(
- self.reactor, ip_whitelist=None, ip_blacklist=IPSet(["1.0.0.0/8"])
- ),
- self.reactor,
- use_proxy=True,
- )
-
- self.reactor.lookups["proxy.com"] = "1.2.3.5"
- d = agent.request(b"GET", b"http://test.com")
-
- # there should be a pending TCP connection
- clients = self.reactor.tcpClients
- self.assertEqual(len(clients), 1)
- (host, port, client_factory, _timeout, _bindAddress) = clients[0]
- self.assertEqual(host, "1.2.3.5")
- self.assertEqual(port, 8888)
-
- # make a test server, and wire up the client
- http_server = self._make_connection(
- client_factory, _get_test_protocol_factory()
- )
-
- # the FakeTransport is async, so we need to pump the reactor
- self.reactor.advance(0)
-
- # now there should be a pending request
- self.assertEqual(len(http_server.requests), 1)
-
- request = http_server.requests[0]
- self.assertEqual(request.method, b"GET")
- self.assertEqual(request.path, b"http://test.com")
- self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"])
- request.write(b"result")
- request.finish()
-
- self.reactor.advance(0)
-
- resp = self.successResultOf(d)
- body = self.successResultOf(treq.content(resp))
- self.assertEqual(body, b"result")
-
@patch.dict(os.environ, {"HTTPS_PROXY": "proxy.com"})
def test_https_request_via_uppercase_proxy_with_blacklist(self):
# The blacklist includes the configured proxy IP.
@@ -484,50 +439,7 @@ class MatrixFederationAgentTests(TestCase):
body = self.successResultOf(treq.content(resp))
self.assertEqual(body, b"result")
- def test_http_request_via_proxy_with_blacklist(self):
- # The blacklist includes the configured proxy IP.
- agent = ProxyAgent(
- BlacklistingReactorWrapper(
- self.reactor, ip_whitelist=None, ip_blacklist=IPSet(["1.0.0.0/8"])
- ),
- self.reactor,
- http_proxy=b"proxy.com:8888",
- )
-
- self.reactor.lookups["proxy.com"] = "1.2.3.5"
- d = agent.request(b"GET", b"http://test.com")
-
- # there should be a pending TCP connection
- clients = self.reactor.tcpClients
- self.assertEqual(len(clients), 1)
- (host, port, client_factory, _timeout, _bindAddress) = clients[0]
- self.assertEqual(host, "1.2.3.5")
- self.assertEqual(port, 8888)
-
- # make a test server, and wire up the client
- http_server = self._make_connection(
- client_factory, _get_test_protocol_factory()
- )
-
- # the FakeTransport is async, so we need to pump the reactor
- self.reactor.advance(0)
-
- # now there should be a pending request
- self.assertEqual(len(http_server.requests), 1)
-
- request = http_server.requests[0]
- self.assertEqual(request.method, b"GET")
- self.assertEqual(request.path, b"http://test.com")
- self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"test.com"])
- request.write(b"result")
- request.finish()
-
- self.reactor.advance(0)
-
- resp = self.successResultOf(d)
- body = self.successResultOf(treq.content(resp))
- self.assertEqual(body, b"result")
-
+ @patch.dict(os.environ, {"https_proxy": "proxy.com"})
def test_https_request_via_proxy_with_blacklist(self):
# The blacklist includes the configured proxy IP.
agent = ProxyAgent(
@@ -536,7 +448,7 @@ class MatrixFederationAgentTests(TestCase):
),
self.reactor,
contextFactory=get_test_https_policy(),
- https_proxy=b"proxy.com",
+ use_proxy=True,
)
self.reactor.lookups["proxy.com"] = "1.2.3.5"
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 0504cd187e..586b877bda 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -58,8 +58,6 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -156,7 +154,6 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
- self.hs = hs
# Allow for uploading and downloading to/from the media repo
self.media_repo = hs.get_media_repository_resource()
diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py
index aa389df12f..d0090faa4f 100644
--- a/tests/rest/admin/test_event_reports.py
+++ b/tests/rest/admin/test_event_reports.py
@@ -32,8 +32,6 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -371,8 +369,6 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py
index c2b998cdae..51a7731693 100644
--- a/tests/rest/admin/test_media.py
+++ b/tests/rest/admin/test_media.py
@@ -35,7 +35,6 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.handler = hs.get_device_handler()
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
@@ -181,7 +180,6 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.handler = hs.get_device_handler()
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index fa620f97f3..a0f32c5512 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -605,8 +605,6 @@ class RoomTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
# Create user
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py
index 73f8a8ec99..f48be3d65a 100644
--- a/tests/rest/admin/test_statistics.py
+++ b/tests/rest/admin/test_statistics.py
@@ -31,7 +31,6 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
self.media_repo = hs.get_media_repository_resource()
self.admin_user = self.register_user("admin", "pass", admin=True)
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 9b2e4765f6..78b48aa0b9 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -25,6 +25,7 @@ from mock import Mock
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
+from synapse.api.room_versions import RoomVersions
from synapse.rest.client.v1 import login, logout, profile, room
from synapse.rest.client.v2_alpha import devices, sync
@@ -587,6 +588,205 @@ class UsersListTestCase(unittest.HomeserverTestCase):
_search_test(None, "bar", "user_id")
+class DeactivateAccountTestCase(unittest.HomeserverTestCase):
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ self.store = hs.get_datastore()
+
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+
+ self.other_user = self.register_user("user", "pass", displayname="User1")
+ self.other_user_token = self.login("user", "pass")
+ self.url_other_user = "/_synapse/admin/v2/users/%s" % urllib.parse.quote(
+ self.other_user
+ )
+ self.url = "/_synapse/admin/v1/deactivate/%s" % urllib.parse.quote(
+ self.other_user
+ )
+
+ # set attributes for user
+ self.get_success(
+ self.store.set_profile_avatar_url("user", "mxc://servername/mediaid", 1)
+ )
+ self.get_success(
+ self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
+ )
+
+ def test_no_auth(self):
+ """
+ Try to deactivate users without authentication.
+ """
+ channel = self.make_request("POST", self.url, b"{}")
+
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
+
+ def test_requester_is_not_admin(self):
+ """
+ If the user is not a server admin, an error is returned.
+ """
+ url = "/_synapse/admin/v1/deactivate/@bob:test"
+
+ channel = self.make_request("POST", url, access_token=self.other_user_token)
+
+ self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("You are not a server admin", channel.json_body["error"])
+
+ channel = self.make_request(
+ "POST", url, access_token=self.other_user_token, content=b"{}",
+ )
+
+ self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("You are not a server admin", channel.json_body["error"])
+
+ def test_user_does_not_exist(self):
+ """
+ Tests that deactivation for a user that does not exist returns a 404
+ """
+
+ channel = self.make_request(
+ "POST",
+ "/_synapse/admin/v1/deactivate/@unknown_person:test",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
+ def test_erase_is_not_bool(self):
+ """
+ If parameter `erase` is not boolean, return an error
+ """
+ body = json.dumps({"erase": "False"})
+
+ channel = self.make_request(
+ "POST",
+ self.url,
+ content=body.encode(encoding="utf_8"),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
+
+ def test_user_is_not_local(self):
+ """
+ Tests that deactivation for a user that is not a local returns a 400
+ """
+ url = "/_synapse/admin/v1/deactivate/@unknown_person:unknown_domain"
+
+ channel = self.make_request("POST", url, access_token=self.admin_user_tok)
+
+ self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual("Can only deactivate local users", channel.json_body["error"])
+
+ def test_deactivate_user_erase_true(self):
+ """
+ Test deactivating an user and set `erase` to `true`
+ """
+
+ # Get user
+ channel = self.make_request(
+ "GET", self.url_other_user, access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("@user:test", channel.json_body["name"])
+ self.assertEqual(False, channel.json_body["deactivated"])
+ self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
+ self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"])
+ self.assertEqual("User1", channel.json_body["displayname"])
+
+ # Deactivate user
+ body = json.dumps({"erase": True})
+
+ channel = self.make_request(
+ "POST",
+ self.url,
+ access_token=self.admin_user_tok,
+ content=body.encode(encoding="utf_8"),
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Get user
+ channel = self.make_request(
+ "GET", self.url_other_user, access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("@user:test", channel.json_body["name"])
+ self.assertEqual(True, channel.json_body["deactivated"])
+ self.assertEqual(0, len(channel.json_body["threepids"]))
+ self.assertIsNone(channel.json_body["avatar_url"])
+ self.assertIsNone(channel.json_body["displayname"])
+
+ self._is_erased("@user:test", True)
+
+ def test_deactivate_user_erase_false(self):
+ """
+ Test deactivating an user and set `erase` to `false`
+ """
+
+ # Get user
+ channel = self.make_request(
+ "GET", self.url_other_user, access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("@user:test", channel.json_body["name"])
+ self.assertEqual(False, channel.json_body["deactivated"])
+ self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
+ self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"])
+ self.assertEqual("User1", channel.json_body["displayname"])
+
+ # Deactivate user
+ body = json.dumps({"erase": False})
+
+ channel = self.make_request(
+ "POST",
+ self.url,
+ access_token=self.admin_user_tok,
+ content=body.encode(encoding="utf_8"),
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Get user
+ channel = self.make_request(
+ "GET", self.url_other_user, access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("@user:test", channel.json_body["name"])
+ self.assertEqual(True, channel.json_body["deactivated"])
+ self.assertEqual(0, len(channel.json_body["threepids"]))
+
+ # On DINUM's deployment we clear the profile information during a deactivation regardless,
+ # whereas on mainline we decided to only do this if the deactivation was performed with erase: True.
+ # The discrepancy is due to profile replication.
+ # See synapse.storage.databases.main.profile.ProfileWorkerStore.set_profiles_active
+ self.assertIsNone(channel.json_body["avatar_url"])
+ self.assertIsNone(channel.json_body["displayname"])
+
+ self._is_erased("@user:test", False)
+
+ def _is_erased(self, user_id: str, expect: bool) -> None:
+ """Assert that the user is erased or not
+ """
+ d = self.store.is_user_erased(user_id)
+ if expect:
+ self.assertTrue(self.get_success(d))
+ else:
+ self.assertFalse(self.get_success(d))
+
+
class UserRestTestCase(unittest.HomeserverTestCase):
servlets = [
@@ -986,6 +1186,31 @@ class UserRestTestCase(unittest.HomeserverTestCase):
Test deactivating another user.
"""
+ # set attributes for user
+ self.get_success(
+ self.store.set_profile_avatar_url("user", "mxc://servername/mediaid", 1)
+ )
+ self.get_success(
+ self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
+ )
+
+ # Get user
+ channel = self.make_request(
+ "GET", self.url_other_user, access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("@user:test", channel.json_body["name"])
+ self.assertEqual(False, channel.json_body["deactivated"])
+ self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
+
+ # On DINUM's deployment we clear the profile information during a deactivation regardless,
+ # whereas on mainline we decided to only do this if the deactivation was performed with erase: True.
+ # The discrepancy is due to profile replication.
+ # See synapse.storage.databases.main.profile.ProfileWorkerStore.set_profiles_active
+ self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"])
+ self.assertEqual("User", channel.json_body["displayname"])
+
# Deactivate user
body = json.dumps({"deactivated": True})
@@ -999,6 +1224,14 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
+ self.assertEqual(0, len(channel.json_body["threepids"]))
+
+ # On DINUM's deployment we clear the profile information during a deactivation regardless,
+ # whereas on mainline we decided to only do this if the deactivation was performed with erase: True.
+ # The discrepancy is due to profile replication.
+ # See synapse.storage.databases.main.profile.ProfileWorkerStore.set_profiles_active
+ self.assertIsNone(channel.json_body["avatar_url"])
+ self.assertIsNone(channel.json_body["displayname"])
# the user is deactivated, the threepid will be deleted
# Get user
@@ -1009,6 +1242,9 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
+ self.assertEqual(0, len(channel.json_body["threepids"]))
+ self.assertIsNone(channel.json_body["avatar_url"])
+ self.assertIsNone(channel.json_body["displayname"])
@override_config({"user_directory": {"enabled": True, "search_all_users": True}})
def test_change_name_deactivate_user_user_directory(self):
@@ -1204,8 +1440,6 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -1236,24 +1470,26 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
def test_user_does_not_exist(self):
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns an empty list
"""
url = "/_synapse/admin/v1/users/@unknown_person:test/joined_rooms"
channel = self.make_request("GET", url, access_token=self.admin_user_tok,)
- self.assertEqual(404, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(0, channel.json_body["total"])
+ self.assertEqual(0, len(channel.json_body["joined_rooms"]))
def test_user_is_not_local(self):
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local and participates in no conversation returns an empty list
"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/joined_rooms"
channel = self.make_request("GET", url, access_token=self.admin_user_tok,)
- self.assertEqual(400, channel.code, msg=channel.json_body)
- self.assertEqual("Can only lookup local users", channel.json_body["error"])
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(0, channel.json_body["total"])
+ self.assertEqual(0, len(channel.json_body["joined_rooms"]))
def test_no_memberships(self):
"""
@@ -1284,6 +1520,49 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
+ def test_get_rooms_with_nonlocal_user(self):
+ """
+ Tests that a normal lookup for rooms is successful with a non-local user
+ """
+
+ other_user_tok = self.login("user", "pass")
+ event_builder_factory = self.hs.get_event_builder_factory()
+ event_creation_handler = self.hs.get_event_creation_handler()
+ storage = self.hs.get_storage()
+
+ # Create two rooms, one with a local user only and one with both a local
+ # and remote user.
+ self.helper.create_room_as(self.other_user, tok=other_user_tok)
+ local_and_remote_room_id = self.helper.create_room_as(
+ self.other_user, tok=other_user_tok
+ )
+
+ # Add a remote user to the room.
+ builder = event_builder_factory.for_room_version(
+ RoomVersions.V1,
+ {
+ "type": "m.room.member",
+ "sender": "@joiner:remote_hs",
+ "state_key": "@joiner:remote_hs",
+ "room_id": local_and_remote_room_id,
+ "content": {"membership": "join"},
+ },
+ )
+
+ event, context = self.get_success(
+ event_creation_handler.create_new_client_event(builder)
+ )
+
+ self.get_success(storage.persistence.persist_event(event, context))
+
+ # Now get rooms
+ url = "/_synapse/admin/v1/users/@joiner:remote_hs/joined_rooms"
+ channel = self.make_request("GET", url, access_token=self.admin_user_tok,)
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(1, channel.json_body["total"])
+ self.assertEqual([local_and_remote_room_id], channel.json_body["joined_rooms"])
+
class PushersRestTestCase(unittest.HomeserverTestCase):
@@ -1401,7 +1680,6 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
self.media_repo = hs.get_media_repository_resource()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -1868,8 +2146,6 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
- self.store = hs.get_datastore()
-
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py
index 1d1dc9f8a2..f9b8011961 100644
--- a/tests/rest/client/v1/test_login.py
+++ b/tests/rest/client/v1/test_login.py
@@ -30,6 +30,7 @@ from synapse.rest.client.v1 import login, logout
from synapse.rest.client.v2_alpha import devices, register
from synapse.rest.client.v2_alpha.account import WhoamiRestServlet
from synapse.rest.synapse.client.pick_idp import PickIdpResource
+from synapse.types import create_requester
from tests import unittest
from tests.handlers.test_oidc import HAS_OIDC
@@ -667,7 +668,9 @@ class CASTestCase(unittest.HomeserverTestCase):
# Deactivate the account.
self.get_success(
- self.deactivate_account_handler.deactivate_account(self.user_id, False)
+ self.deactivate_account_handler.deactivate_account(
+ self.user_id, False, create_requester(self.user_id)
+ )
)
# Request the CAS ticket.
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 6105eac47c..d4e3165436 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -29,7 +29,7 @@ from synapse.handlers.pagination import PurgeStatus
from synapse.rest import admin
from synapse.rest.client.v1 import directory, login, profile, room
from synapse.rest.client.v2_alpha import account
-from synapse.types import JsonDict, RoomAlias, UserID
+from synapse.types import JsonDict, RoomAlias, UserID, create_requester
from synapse.util.stringutils import random_string
from tests import unittest
@@ -1687,7 +1687,9 @@ class ContextTestCase(unittest.HomeserverTestCase):
deactivate_account_handler = self.hs.get_deactivate_account_handler()
self.get_success(
- deactivate_account_handler.deactivate_account(self.user_id, erase_data=True)
+ deactivate_account_handler.deactivate_account(
+ self.user_id, True, create_requester(self.user_id)
+ )
)
# Invite another user in the room. This is needed because messages will be
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
new file mode 100644
index 0000000000..83c377824b
--- /dev/null
+++ b/tests/storage/test_event_chain.py
@@ -0,0 +1,472 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Dict, List, Tuple
+
+from twisted.trial import unittest
+
+from synapse.api.constants import EventTypes
+from synapse.api.room_versions import RoomVersions
+from synapse.events import EventBase
+from synapse.storage.databases.main.events import _LinkMap
+
+from tests.unittest import HomeserverTestCase
+
+
+class EventChainStoreTestCase(HomeserverTestCase):
+ def prepare(self, reactor, clock, hs):
+ self.store = hs.get_datastore()
+ self._next_stream_ordering = 1
+
+ def test_simple(self):
+ """Test that the example in `docs/auth_chain_difference_algorithm.md`
+ works.
+ """
+
+ event_factory = self.hs.get_event_builder_factory()
+ bob = "@creator:test"
+ alice = "@alice:test"
+ room_id = "!room:test"
+
+ # Ensure that we have a rooms entry so that we generate the chain index.
+ self.get_success(
+ self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id="",
+ is_public=True,
+ room_version=RoomVersions.V6,
+ )
+ )
+
+ create = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Create,
+ "state_key": "",
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "create"},
+ },
+ ).build(prev_event_ids=[], auth_event_ids=[])
+ )
+
+ bob_join = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": bob,
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "bob_join"},
+ },
+ ).build(prev_event_ids=[], auth_event_ids=[create.event_id])
+ )
+
+ power = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.PowerLevels,
+ "state_key": "",
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "power"},
+ },
+ ).build(
+ prev_event_ids=[], auth_event_ids=[create.event_id, bob_join.event_id],
+ )
+ )
+
+ alice_invite = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "alice_invite"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
+ )
+ )
+
+ alice_join = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": alice,
+ "room_id": room_id,
+ "content": {"tag": "alice_join"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, alice_invite.event_id, power.event_id],
+ )
+ )
+
+ power_2 = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.PowerLevels,
+ "state_key": "",
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "power_2"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
+ )
+ )
+
+ bob_join_2 = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": bob,
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "bob_join_2"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
+ )
+ )
+
+ alice_join2 = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": alice,
+ "room_id": room_id,
+ "content": {"tag": "alice_join2"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[
+ create.event_id,
+ alice_join.event_id,
+ power_2.event_id,
+ ],
+ )
+ )
+
+ events = [
+ create,
+ bob_join,
+ power,
+ alice_invite,
+ alice_join,
+ bob_join_2,
+ power_2,
+ alice_join2,
+ ]
+
+ expected_links = [
+ (bob_join, create),
+ (power, create),
+ (power, bob_join),
+ (alice_invite, create),
+ (alice_invite, power),
+ (alice_invite, bob_join),
+ (bob_join_2, power),
+ (alice_join2, power_2),
+ ]
+
+ self.persist(events)
+ chain_map, link_map = self.fetch_chains(events)
+
+ # Check that the expected links and only the expected links have been
+ # added.
+ self.assertEqual(len(expected_links), len(list(link_map.get_additions())))
+
+ for start, end in expected_links:
+ start_id, start_seq = chain_map[start.event_id]
+ end_id, end_seq = chain_map[end.event_id]
+
+ self.assertIn(
+ (start_seq, end_seq), list(link_map.get_links_between(start_id, end_id))
+ )
+
+ # Test that everything can reach the create event, but the create event
+ # can't reach anything.
+ for event in events[1:]:
+ self.assertTrue(
+ link_map.exists_path_from(
+ chain_map[event.event_id], chain_map[create.event_id]
+ ),
+ )
+
+ self.assertFalse(
+ link_map.exists_path_from(
+ chain_map[create.event_id], chain_map[event.event_id],
+ ),
+ )
+
+ def test_out_of_order_events(self):
+ """Test that we handle persisting events that we don't have the full
+ auth chain for yet (which should only happen for out of band memberships).
+ """
+ event_factory = self.hs.get_event_builder_factory()
+ bob = "@creator:test"
+ alice = "@alice:test"
+ room_id = "!room:test"
+
+ # Ensure that we have a rooms entry so that we generate the chain index.
+ self.get_success(
+ self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id="",
+ is_public=True,
+ room_version=RoomVersions.V6,
+ )
+ )
+
+ # First persist the base room.
+ create = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Create,
+ "state_key": "",
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "create"},
+ },
+ ).build(prev_event_ids=[], auth_event_ids=[])
+ )
+
+ bob_join = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": bob,
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "bob_join"},
+ },
+ ).build(prev_event_ids=[], auth_event_ids=[create.event_id])
+ )
+
+ power = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.PowerLevels,
+ "state_key": "",
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "power"},
+ },
+ ).build(
+ prev_event_ids=[], auth_event_ids=[create.event_id, bob_join.event_id],
+ )
+ )
+
+ self.persist([create, bob_join, power])
+
+ # Now persist an invite and a couple of memberships out of order.
+ alice_invite = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": bob,
+ "room_id": room_id,
+ "content": {"tag": "alice_invite"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
+ )
+ )
+
+ alice_join = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": alice,
+ "room_id": room_id,
+ "content": {"tag": "alice_join"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, alice_invite.event_id, power.event_id],
+ )
+ )
+
+ alice_join2 = self.get_success(
+ event_factory.for_room_version(
+ RoomVersions.V6,
+ {
+ "type": EventTypes.Member,
+ "state_key": alice,
+ "sender": alice,
+ "room_id": room_id,
+ "content": {"tag": "alice_join2"},
+ },
+ ).build(
+ prev_event_ids=[],
+ auth_event_ids=[create.event_id, alice_join.event_id, power.event_id],
+ )
+ )
+
+ self.persist([alice_join])
+ self.persist([alice_join2])
+ self.persist([alice_invite])
+
+ # The end result should be sane.
+ events = [create, bob_join, power, alice_invite, alice_join]
+
+ chain_map, link_map = self.fetch_chains(events)
+
+ expected_links = [
+ (bob_join, create),
+ (power, create),
+ (power, bob_join),
+ (alice_invite, create),
+ (alice_invite, power),
+ (alice_invite, bob_join),
+ ]
+
+ # Check that the expected links and only the expected links have been
+ # added.
+ self.assertEqual(len(expected_links), len(list(link_map.get_additions())))
+
+ for start, end in expected_links:
+ start_id, start_seq = chain_map[start.event_id]
+ end_id, end_seq = chain_map[end.event_id]
+
+ self.assertIn(
+ (start_seq, end_seq), list(link_map.get_links_between(start_id, end_id))
+ )
+
+ def persist(
+ self, events: List[EventBase],
+ ):
+ """Persist the given events and check that the links generated match
+ those given.
+ """
+
+ persist_events_store = self.hs.get_datastores().persist_events
+
+ for e in events:
+ e.internal_metadata.stream_ordering = self._next_stream_ordering
+ self._next_stream_ordering += 1
+
+ def _persist(txn):
+ # We need to persist the events to the events and state_events
+ # tables.
+ persist_events_store._store_event_txn(txn, [(e, {}) for e in events])
+
+ # Actually call the function that calculates the auth chain stuff.
+ persist_events_store._persist_event_auth_chain_txn(txn, events)
+
+ self.get_success(
+ persist_events_store.db_pool.runInteraction("_persist", _persist,)
+ )
+
+ def fetch_chains(
+ self, events: List[EventBase]
+ ) -> Tuple[Dict[str, Tuple[int, int]], _LinkMap]:
+
+ # Fetch the map from event ID -> (chain ID, sequence number)
+ rows = self.get_success(
+ self.store.db_pool.simple_select_many_batch(
+ table="event_auth_chains",
+ column="event_id",
+ iterable=[e.event_id for e in events],
+ retcols=("event_id", "chain_id", "sequence_number"),
+ keyvalues={},
+ )
+ )
+
+ chain_map = {
+ row["event_id"]: (row["chain_id"], row["sequence_number"]) for row in rows
+ }
+
+ # Fetch all the links and pass them to the _LinkMap.
+ rows = self.get_success(
+ self.store.db_pool.simple_select_many_batch(
+ table="event_auth_chain_links",
+ column="origin_chain_id",
+ iterable=[chain_id for chain_id, _ in chain_map.values()],
+ retcols=(
+ "origin_chain_id",
+ "origin_sequence_number",
+ "target_chain_id",
+ "target_sequence_number",
+ ),
+ keyvalues={},
+ )
+ )
+
+ link_map = _LinkMap()
+ for row in rows:
+ added = link_map.add_link(
+ (row["origin_chain_id"], row["origin_sequence_number"]),
+ (row["target_chain_id"], row["target_sequence_number"]),
+ )
+
+ # We shouldn't have persisted any redundant links
+ self.assertTrue(added)
+
+ return chain_map, link_map
+
+
+class LinkMapTestCase(unittest.TestCase):
+ def test_simple(self):
+ """Basic tests for the LinkMap.
+ """
+ link_map = _LinkMap()
+
+ link_map.add_link((1, 1), (2, 1), new=False)
+ self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1)])
+ self.assertCountEqual(link_map.get_links_from((1, 1)), [(2, 1)])
+ self.assertCountEqual(link_map.get_additions(), [])
+ self.assertTrue(link_map.exists_path_from((1, 5), (2, 1)))
+ self.assertFalse(link_map.exists_path_from((1, 5), (2, 2)))
+ self.assertTrue(link_map.exists_path_from((1, 5), (1, 1)))
+ self.assertFalse(link_map.exists_path_from((1, 1), (1, 5)))
+
+ # Attempting to add a redundant link is ignored.
+ self.assertFalse(link_map.add_link((1, 4), (2, 1)))
+ self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1)])
+
+ # Adding new non-redundant links works
+ self.assertTrue(link_map.add_link((1, 3), (2, 3)))
+ self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1), (3, 3)])
+
+ self.assertTrue(link_map.add_link((2, 5), (1, 3)))
+ self.assertCountEqual(link_map.get_links_between(2, 1), [(5, 3)])
+ self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1), (3, 3)])
+
+ self.assertCountEqual(link_map.get_additions(), [(1, 3, 2, 3), (2, 5, 1, 3)])
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 482506d731..9d04a066d8 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -13,6 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import attr
+from parameterized import parameterized
+
+from synapse.events import _EventInternalMetadata
+
import tests.unittest
import tests.utils
@@ -113,7 +118,8 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1]))
self.assertTrue(r == [room2] or r == [room3])
- def test_auth_difference(self):
+ @parameterized.expand([(True,), (False,)])
+ def test_auth_difference(self, use_chain_cover_index: bool):
room_id = "@ROOM:local"
# The silly auth graph we use to test the auth difference algorithm,
@@ -159,46 +165,223 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
"j": 1,
}
+ # Mark the room as not having a cover index
+
+ def store_room(txn):
+ self.store.db_pool.simple_insert_txn(
+ txn,
+ "rooms",
+ {
+ "room_id": room_id,
+ "creator": "room_creator_user_id",
+ "is_public": True,
+ "room_version": "6",
+ "has_auth_chain_index": use_chain_cover_index,
+ },
+ )
+
+ self.get_success(self.store.db_pool.runInteraction("store_room", store_room))
+
# We rudely fiddle with the appropriate tables directly, as that's much
# easier than constructing events properly.
- def insert_event(txn, event_id, stream_ordering):
+ def insert_event(txn):
+ stream_ordering = 0
+
+ for event_id in auth_graph:
+ stream_ordering += 1
+ depth = depth_map[event_id]
+
+ self.store.db_pool.simple_insert_txn(
+ txn,
+ table="events",
+ values={
+ "event_id": event_id,
+ "room_id": room_id,
+ "depth": depth,
+ "topological_ordering": depth,
+ "type": "m.test",
+ "processed": True,
+ "outlier": False,
+ "stream_ordering": stream_ordering,
+ },
+ )
+
+ self.hs.datastores.persist_events._persist_event_auth_chain_txn(
+ txn,
+ [
+ FakeEvent(event_id, room_id, auth_graph[event_id])
+ for event_id in auth_graph
+ ],
+ )
+
+ self.get_success(self.store.db_pool.runInteraction("insert", insert_event,))
+
+ # Now actually test that various combinations give the right result:
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
+ )
+ self.assertSetEqual(difference, {"a", "b"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
+ )
+ self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
+ )
+ self.assertSetEqual(difference, {"a", "b", "c"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
+ )
+ self.assertSetEqual(difference, {"a", "b"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
+ )
+ self.assertSetEqual(difference, {"a", "b", "d", "e"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
+ )
+ self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
+ )
+ self.assertSetEqual(difference, {"a", "b"})
+
+ difference = self.get_success(
+ self.store.get_auth_chain_difference(room_id, [{"a"}])
+ )
+ self.assertSetEqual(difference, set())
+
+ def test_auth_difference_partial_cover(self):
+ """Test that we correctly handle rooms where not all events have a chain
+ cover calculated. This can happen in some obscure edge cases, including
+ during the background update that calculates the chain cover for old
+ rooms.
+ """
+
+ room_id = "@ROOM:local"
+
+ # The silly auth graph we use to test the auth difference algorithm,
+ # where the top are the most recent events.
+ #
+ # A B
+ # \ /
+ # D E
+ # \ |
+ # ` F C
+ # | /|
+ # G ยด |
+ # | \ |
+ # H I
+ # | |
+ # K J
+
+ auth_graph = {
+ "a": ["e"],
+ "b": ["e"],
+ "c": ["g", "i"],
+ "d": ["f"],
+ "e": ["f"],
+ "f": ["g"],
+ "g": ["h", "i"],
+ "h": ["k"],
+ "i": ["j"],
+ "k": [],
+ "j": [],
+ }
+
+ depth_map = {
+ "a": 7,
+ "b": 7,
+ "c": 4,
+ "d": 6,
+ "e": 6,
+ "f": 5,
+ "g": 3,
+ "h": 2,
+ "i": 2,
+ "k": 1,
+ "j": 1,
+ }
- depth = depth_map[event_id]
+ # We rudely fiddle with the appropriate tables directly, as that's much
+ # easier than constructing events properly.
+ def insert_event(txn):
+ # First insert the room and mark it as having a chain cover.
self.store.db_pool.simple_insert_txn(
txn,
- table="events",
- values={
- "event_id": event_id,
+ "rooms",
+ {
"room_id": room_id,
- "depth": depth,
- "topological_ordering": depth,
- "type": "m.test",
- "processed": True,
- "outlier": False,
- "stream_ordering": stream_ordering,
+ "creator": "room_creator_user_id",
+ "is_public": True,
+ "room_version": "6",
+ "has_auth_chain_index": True,
},
)
- self.store.db_pool.simple_insert_many_txn(
+ stream_ordering = 0
+
+ for event_id in auth_graph:
+ stream_ordering += 1
+ depth = depth_map[event_id]
+
+ self.store.db_pool.simple_insert_txn(
+ txn,
+ table="events",
+ values={
+ "event_id": event_id,
+ "room_id": room_id,
+ "depth": depth,
+ "topological_ordering": depth,
+ "type": "m.test",
+ "processed": True,
+ "outlier": False,
+ "stream_ordering": stream_ordering,
+ },
+ )
+
+ # Insert all events apart from 'B'
+ self.hs.datastores.persist_events._persist_event_auth_chain_txn(
txn,
- table="event_auth",
- values=[
- {"event_id": event_id, "room_id": room_id, "auth_id": a}
- for a in auth_graph[event_id]
+ [
+ FakeEvent(event_id, room_id, auth_graph[event_id])
+ for event_id in auth_graph
+ if event_id != "b"
],
)
- next_stream_ordering = 0
- for event_id in auth_graph:
- next_stream_ordering += 1
- self.get_success(
- self.store.db_pool.runInteraction(
- "insert", insert_event, event_id, next_stream_ordering
- )
+ # Now we insert the event 'B' without a chain cover, by temporarily
+ # pretending the room doesn't have a chain cover.
+
+ self.store.db_pool.simple_update_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"has_auth_chain_index": False},
+ )
+
+ self.hs.datastores.persist_events._persist_event_auth_chain_txn(
+ txn, [FakeEvent("b", room_id, auth_graph["b"])],
+ )
+
+ self.store.db_pool.simple_update_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"has_auth_chain_index": True},
)
+ self.get_success(self.store.db_pool.runInteraction("insert", insert_event,))
+
# Now actually test that various combinations give the right result:
difference = self.get_success(
@@ -240,3 +423,21 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
self.store.get_auth_chain_difference(room_id, [{"a"}])
)
self.assertSetEqual(difference, set())
+
+
+@attr.s
+class FakeEvent:
+ event_id = attr.ib()
+ room_id = attr.ib()
+ auth_events = attr.ib()
+
+ type = "foo"
+ state_key = "foo"
+
+ internal_metadata = _EventInternalMetadata({})
+
+ def auth_event_ids(self):
+ return self.auth_events
+
+ def is_state(self):
+ return True
diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py
index 7a38022e71..b7dde51224 100644
--- a/tests/storage/test_profile.py
+++ b/tests/storage/test_profile.py
@@ -48,6 +48,19 @@ class ProfileStoreTestCase(unittest.TestCase):
),
)
+ # test set to None
+ yield defer.ensureDeferred(
+ self.store.set_profile_displayname(self.u_frank.localpart, None, 2)
+ )
+
+ self.assertIsNone(
+ (
+ yield defer.ensureDeferred(
+ self.store.get_profile_displayname(self.u_frank.localpart)
+ )
+ )
+ )
+
@defer.inlineCallbacks
def test_avatar_url(self):
yield defer.ensureDeferred(self.store.create_profile(self.u_frank.localpart))
@@ -66,3 +79,16 @@ class ProfileStoreTestCase(unittest.TestCase):
)
),
)
+
+ # test set to None
+ yield defer.ensureDeferred(
+ self.store.set_profile_avatar_url(self.u_frank.localpart, None, 2)
+ )
+
+ self.assertIsNone(
+ (
+ yield defer.ensureDeferred(
+ self.store.get_profile_avatar_url(self.u_frank.localpart)
+ )
+ )
+ )
diff --git a/tests/test_types.py b/tests/test_types.py
index d4a722a30f..67ceea6e43 100644
--- a/tests/test_types.py
+++ b/tests/test_types.py
@@ -65,6 +65,10 @@ class RoomAliasTestCase(unittest.HomeserverTestCase):
self.assertEquals(room.to_string(), "#channel:my.domain")
+ def test_validate(self):
+ id_string = "#test:domain,test"
+ self.assertFalse(RoomAlias.is_valid(id_string))
+
class GroupIDTestCase(unittest.TestCase):
def test_parse(self):
diff --git a/tests/util/test_itertools.py b/tests/util/test_itertools.py
index 0ab0a91483..1184cea5a3 100644
--- a/tests/util/test_itertools.py
+++ b/tests/util/test_itertools.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.iterutils import chunk_seq
+from typing import Dict, List
+
+from synapse.util.iterutils import chunk_seq, sorted_topologically
from tests.unittest import TestCase
@@ -45,3 +47,40 @@ class ChunkSeqTests(TestCase):
self.assertEqual(
list(parts), [],
)
+
+
+class SortTopologically(TestCase):
+ def test_empty(self):
+ "Test that an empty graph works correctly"
+
+ graph = {} # type: Dict[int, List[int]]
+ self.assertEqual(list(sorted_topologically([], graph)), [])
+
+ def test_disconnected(self):
+ "Test that a graph with no edges work"
+
+ graph = {1: [], 2: []} # type: Dict[int, List[int]]
+
+ # For disconnected nodes the output is simply sorted.
+ self.assertEqual(list(sorted_topologically([1, 2], graph)), [1, 2])
+
+ def test_linear(self):
+ "Test that a simple `4 -> 3 -> 2 -> 1` graph works"
+
+ graph = {1: [], 2: [1], 3: [2], 4: [3]} # type: Dict[int, List[int]]
+
+ self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
+
+ def test_subset(self):
+ "Test that only sorting a subset of the graph works"
+ graph = {1: [], 2: [1], 3: [2], 4: [3]} # type: Dict[int, List[int]]
+
+ self.assertEqual(list(sorted_topologically([4, 3], graph)), [3, 4])
+
+ def test_fork(self):
+ "Test that a forked graph works"
+ graph = {1: [], 2: [1], 3: [1], 4: [2, 3]} # type: Dict[int, List[int]]
+
+ # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
+ # always get the same one.
+ self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
|