diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index 0c4504d5d8..2b74a40166 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -222,6 +222,7 @@ def main() -> None:
args = parser.parse_args()
+ config: Optional[Dict[str, Any]] = None
if "config" in args and args.config:
config = yaml.safe_load(args.config)
@@ -229,7 +230,7 @@ def main() -> None:
secret = args.shared_secret
else:
# argparse should check that we have either config or shared secret
- assert config
+ assert config is not None
secret = config.get("registration_shared_secret")
secret_file = config.get("registration_shared_secret_path")
@@ -244,7 +245,7 @@ def main() -> None:
if args.server_url:
server_url = args.server_url
- elif config:
+ elif config is not None:
server_url = _find_client_listener(config)
if not server_url:
server_url = _DEFAULT_SERVER_URL
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index bc04a0755b..89723d24fa 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -230,6 +230,9 @@ class EventContentFields:
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"
+ # an unspecced field added to to-device messages to identify them uniquely-ish
+ TO_DEVICE_MSGID: Final = "org.matrix.msgid"
+
class RoomTypes:
"""Understood values of the room_type field of m.room.create events."""
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e2cfcea0f2..76ef12ed3a 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -300,10 +300,8 @@ class InteractiveAuthIncompleteError(Exception):
class UnrecognizedRequestError(SynapseError):
"""An error indicating we don't understand the request you're trying to make"""
- def __init__(
- self, msg: str = "Unrecognized request", errcode: str = Codes.UNRECOGNIZED
- ):
- super().__init__(400, msg, errcode)
+ def __init__(self, msg: str = "Unrecognized request", code: int = 400):
+ super().__init__(code, msg, Codes.UNRECOGNIZED)
class NotFoundError(SynapseError):
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index e37acb0f1e..ac62011c9f 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Callable, Dict, Optional
+from typing import Callable, Dict, List, Optional
import attr
@@ -51,6 +51,13 @@ class RoomDisposition:
UNSTABLE = "unstable"
+class PushRuleRoomFlag:
+ """Enum for listing possible MSC3931 room version feature flags, for push rules"""
+
+ # MSC3932: Room version supports MSC1767 Extensible Events.
+ EXTENSIBLE_EVENTS = "org.matrix.msc3932.extensible_events"
+
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomVersion:
"""An object which describes the unique attributes of a room version."""
@@ -91,6 +98,12 @@ class RoomVersion:
msc3787_knock_restricted_join_rule: bool
# MSC3667: Enforce integer power levels
msc3667_int_only_power_levels: bool
+ # MSC3931: Adds a push rule condition for "room version feature flags", making
+ # some push rules room version dependent. Note that adding a flag to this list
+ # is not enough to mark it "supported": the push rule evaluator also needs to
+ # support the flag. Unknown flags are ignored by the evaluator, making conditions
+ # fail if used.
+ msc3931_push_features: List[str] # values from PushRuleRoomFlag
class RoomVersions:
@@ -111,6 +124,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V2 = RoomVersion(
"2",
@@ -129,6 +143,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V3 = RoomVersion(
"3",
@@ -147,6 +162,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V4 = RoomVersion(
"4",
@@ -165,6 +181,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V5 = RoomVersion(
"5",
@@ -183,6 +200,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V6 = RoomVersion(
"6",
@@ -201,6 +219,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
@@ -219,6 +238,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V7 = RoomVersion(
"7",
@@ -237,6 +257,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V8 = RoomVersion(
"8",
@@ -255,6 +276,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V9 = RoomVersion(
"9",
@@ -273,6 +295,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
MSC3787 = RoomVersion(
"org.matrix.msc3787",
@@ -291,6 +314,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
)
V10 = RoomVersion(
"10",
@@ -309,6 +333,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
+ msc3931_push_features=[],
)
MSC2716v4 = RoomVersion(
"org.matrix.msc2716v4",
@@ -327,6 +352,27 @@ class RoomVersions:
msc2716_redactions=True,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
+ msc3931_push_features=[],
+ )
+ MSC1767v10 = RoomVersion(
+ # MSC1767 (Extensible Events) based on room version "10"
+ "org.matrix.msc1767.10",
+ RoomDisposition.UNSTABLE,
+ EventFormatVersions.ROOM_V4_PLUS,
+ StateResolutionVersions.V2,
+ enforce_key_validity=True,
+ special_case_aliases_auth=False,
+ strict_canonicaljson=True,
+ limit_notifications_power_levels=True,
+ msc2176_redaction_rules=False,
+ msc3083_join_rules=True,
+ msc3375_redaction_rules=True,
+ msc2403_knocking=True,
+ msc2716_historical=False,
+ msc2716_redactions=False,
+ msc3787_knock_restricted_join_rule=True,
+ msc3667_int_only_power_levels=True,
+ msc3931_push_features=[PushRuleRoomFlag.EXTENSIBLE_EVENTS],
)
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 46dc731696..bcc8abe20c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -44,40 +44,8 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
+from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.client import (
- account_data,
- events,
- initial_sync,
- login,
- presence,
- profile,
- push_rule,
- read_marker,
- receipts,
- relations,
- room,
- room_batch,
- room_keys,
- sendtodevice,
- sync,
- tags,
- user_directory,
- versions,
- voip,
-)
-from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
-from synapse.rest.client.devices import DevicesRestServlet
-from synapse.rest.client.keys import (
- KeyChangesServlet,
- KeyQueryServlet,
- KeyUploadServlet,
- OneTimeKeyServlet,
-)
-from synapse.rest.client.register import (
- RegisterRestServlet,
- RegistrationTokenValidityRestServlet,
-)
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -200,45 +168,7 @@ class GenericWorkerServer(HomeServer):
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
- resource = JsonResource(self, canonical_json=False)
-
- RegisterRestServlet(self).register(resource)
- RegistrationTokenValidityRestServlet(self).register(resource)
- login.register_servlets(self, resource)
- ThreepidRestServlet(self).register(resource)
- WhoamiRestServlet(self).register(resource)
- DevicesRestServlet(self).register(resource)
-
- # Read-only
- KeyUploadServlet(self).register(resource)
- KeyQueryServlet(self).register(resource)
- KeyChangesServlet(self).register(resource)
- OneTimeKeyServlet(self).register(resource)
-
- voip.register_servlets(self, resource)
- push_rule.register_servlets(self, resource)
- versions.register_servlets(self, resource)
-
- profile.register_servlets(self, resource)
-
- sync.register_servlets(self, resource)
- events.register_servlets(self, resource)
- room.register_servlets(self, resource, is_worker=True)
- relations.register_servlets(self, resource)
- room.register_deprecated_servlets(self, resource)
- initial_sync.register_servlets(self, resource)
- room_batch.register_servlets(self, resource)
- room_keys.register_servlets(self, resource)
- tags.register_servlets(self, resource)
- account_data.register_servlets(self, resource)
- receipts.register_servlets(self, resource)
- read_marker.register_servlets(self, resource)
-
- sendtodevice.register_servlets(self, resource)
-
- user_directory.register_servlets(self, resource)
-
- presence.register_servlets(self, resource)
+ resource: Resource = ClientRestResource(self)
resources[CLIENT_API_PREFIX] = resource
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 500bdde3a9..bf4e6c629b 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -32,9 +32,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-# Type for the `device_one_time_key_counts` field in an appservice transaction
+# Type for the `device_one_time_keys_count` field in an appservice transaction
# user ID -> {device ID -> {algorithm -> count}}
-TransactionOneTimeKeyCounts = Dict[str, Dict[str, Dict[str, int]]]
+TransactionOneTimeKeysCount = Dict[str, Dict[str, Dict[str, int]]]
# Type for the `device_unused_fallback_key_types` field in an appservice transaction
# user ID -> {device ID -> [algorithm]}
@@ -376,7 +376,7 @@ class AppServiceTransaction:
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
- one_time_key_counts: TransactionOneTimeKeyCounts,
+ one_time_keys_count: TransactionOneTimeKeysCount,
unused_fallback_keys: TransactionUnusedFallbackKeys,
device_list_summary: DeviceListUpdates,
):
@@ -385,7 +385,7 @@ class AppServiceTransaction:
self.events = events
self.ephemeral = ephemeral
self.to_device_messages = to_device_messages
- self.one_time_key_counts = one_time_key_counts
+ self.one_time_keys_count = one_time_keys_count
self.unused_fallback_keys = unused_fallback_keys
self.device_list_summary = device_list_summary
@@ -402,7 +402,7 @@ class AppServiceTransaction:
events=self.events,
ephemeral=self.ephemeral,
to_device_messages=self.to_device_messages,
- one_time_key_counts=self.one_time_key_counts,
+ one_time_keys_count=self.one_time_keys_count,
unused_fallback_keys=self.unused_fallback_keys,
device_list_summary=self.device_list_summary,
txn_id=self.id,
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 60774b240d..edafd433cd 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.appservice import (
ApplicationService,
- TransactionOneTimeKeyCounts,
+ TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
@@ -262,7 +262,7 @@ class ApplicationServiceApi(SimpleHttpClient):
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
- one_time_key_counts: TransactionOneTimeKeyCounts,
+ one_time_keys_count: TransactionOneTimeKeysCount,
unused_fallback_keys: TransactionUnusedFallbackKeys,
device_list_summary: DeviceListUpdates,
txn_id: Optional[int] = None,
@@ -310,10 +310,13 @@ class ApplicationServiceApi(SimpleHttpClient):
# TODO: Update to stable prefixes once MSC3202 completes FCP merge
if service.msc3202_transaction_extensions:
- if one_time_key_counts:
+ if one_time_keys_count:
body[
"org.matrix.msc3202.device_one_time_key_counts"
- ] = one_time_key_counts
+ ] = one_time_keys_count
+ body[
+ "org.matrix.msc3202.device_one_time_keys_count"
+ ] = one_time_keys_count
if unused_fallback_keys:
body[
"org.matrix.msc3202.device_unused_fallback_key_types"
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 430ffbcd1f..7b562795a3 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -64,7 +64,7 @@ from typing import (
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
- TransactionOneTimeKeyCounts,
+ TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.appservice.api import ApplicationServiceApi
@@ -258,7 +258,7 @@ class _ServiceQueuer:
):
return
- one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
+ one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None
if (
@@ -269,7 +269,7 @@ class _ServiceQueuer:
# for the users which are mentioned in this transaction,
# as well as the appservice's sender.
(
- one_time_key_counts,
+ one_time_keys_count,
unused_fallback_keys,
) = await self._compute_msc3202_otk_counts_and_fallback_keys(
service, events, ephemeral, to_device_messages_to_send
@@ -281,7 +281,7 @@ class _ServiceQueuer:
events,
ephemeral,
to_device_messages_to_send,
- one_time_key_counts,
+ one_time_keys_count,
unused_fallback_keys,
device_list_summary,
)
@@ -296,7 +296,7 @@ class _ServiceQueuer:
events: Iterable[EventBase],
ephemerals: Iterable[JsonDict],
to_device_messages: Iterable[JsonDict],
- ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]:
+ ) -> Tuple[TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys]:
"""
Given a list of the events, ephemeral messages and to-device messages,
- first computes a list of application services users that may have
@@ -367,7 +367,7 @@ class _TransactionController:
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
- one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
+ one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None,
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
device_list_summary: Optional[DeviceListUpdates] = None,
) -> None:
@@ -380,7 +380,7 @@ class _TransactionController:
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
- one_time_key_counts: Counts of remaining one-time keys for relevant
+ one_time_keys_count: Counts of remaining one-time keys for relevant
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
@@ -397,7 +397,7 @@ class _TransactionController:
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
- one_time_key_counts=one_time_key_counts or {},
+ one_time_keys_count=one_time_keys_count or {},
unused_fallback_keys=unused_fallback_keys or {},
device_list_summary=device_list_summary or DeviceListUpdates(),
)
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index d4b71d1673..573fa0386f 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -16,6 +16,7 @@ from typing import Any, Optional
import attr
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.config._base import Config
from synapse.types import JsonDict
@@ -53,9 +54,6 @@ class ExperimentalConfig(Config):
# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
- # MSC3030 (Jump to date API endpoint)
- self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
-
# MSC2409 (this setting only relates to optionally sending to-device messages).
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. If enabled, this adds to-device messages to that list.
@@ -131,3 +129,10 @@ class ExperimentalConfig(Config):
# MSC3912: Relation-based redactions.
self.msc3912_enabled: bool = experimental.get("msc3912_enabled", False)
+
+ # MSC1767 and friends: Extensible Events
+ self.msc1767_enabled: bool = experimental.get("msc1767_enabled", False)
+ if self.msc1767_enabled:
+ # Enable room version (and thus applicable push rules from MSC3931/3932)
+ version_id = RoomVersions.MSC1767v10.identifier
+ KNOWN_ROOM_VERSIONS[version_id] = RoomVersions.MSC1767v10
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 979b128eae..3b5378e6ea 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -26,6 +26,7 @@ class PushConfig(Config):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
push_config = config.get("push") or {}
self.push_include_content = push_config.get("include_content", True)
+ self.enable_push = push_config.get("enabled", True)
self.push_group_unread_count_by_room = push_config.get(
"group_unread_count_by_room", True
)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ed15f88350..69310d9035 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -14,7 +14,6 @@
import abc
import logging
-import urllib
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
import attr
@@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
results = {}
- async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None:
+ async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None:
server_name = key_to_fetch_item.server_name
- key_ids = key_to_fetch_item.key_ids
try:
- keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
+ keys = await self.get_server_verify_keys_v2_direct(server_name)
results[server_name] = keys
except KeyLookupError as e:
- logger.warning(
- "Error looking up keys %s from %s: %s", key_ids, server_name, e
- )
+ logger.warning("Error looking up keys from %s: %s", server_name, e)
except Exception:
- logger.exception("Error getting keys %s from %s", key_ids, server_name)
+ logger.exception("Error getting keys from %s", server_name)
- await yieldable_gather_results(get_key, keys_to_fetch)
+ await yieldable_gather_results(get_keys, keys_to_fetch)
return results
- async def get_server_verify_key_v2_direct(
- self, server_name: str, key_ids: Iterable[str]
+ async def get_server_verify_keys_v2_direct(
+ self, server_name: str
) -> Dict[str, FetchKeyResult]:
"""
Args:
- server_name:
- key_ids:
+ server_name: Server to request keys from
Returns:
Map from key ID to lookup result
@@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
Raises:
KeyLookupError if there was a problem making the lookup
"""
- keys: Dict[str, FetchKeyResult] = {}
-
- for requested_key_id in key_ids:
- # we may have found this key as a side-effect of asking for another.
- if requested_key_id in keys:
- continue
-
- time_now_ms = self.clock.time_msec()
- try:
- response = await self.client.get_json(
- destination=server_name,
- path="/_matrix/key/v2/server/"
- + urllib.parse.quote(requested_key_id, safe=""),
- ignore_backoff=True,
- # we only give the remote server 10s to respond. It should be an
- # easy request to handle, so if it doesn't reply within 10s, it's
- # probably not going to.
- #
- # Furthermore, when we are acting as a notary server, we cannot
- # wait all day for all of the origin servers, as the requesting
- # server will otherwise time out before we can respond.
- #
- # (Note that get_json may make 4 attempts, so this can still take
- # almost 45 seconds to fetch the headers, plus up to another 60s to
- # read the response).
- timeout=10000,
- )
- except (NotRetryingDestination, RequestSendFailed) as e:
- # these both have str() representations which we can't really improve
- # upon
- raise KeyLookupError(str(e))
- except HttpResponseException as e:
- raise KeyLookupError("Remote server returned an error: %s" % (e,))
-
- assert isinstance(response, dict)
- if response["server_name"] != server_name:
- raise KeyLookupError(
- "Expected a response for server %r not %r"
- % (server_name, response["server_name"])
- )
-
- response_keys = await self.process_v2_response(
- from_server=server_name,
- response_json=response,
- time_added_ms=time_now_ms,
+ time_now_ms = self.clock.time_msec()
+ try:
+ response = await self.client.get_json(
+ destination=server_name,
+ path="/_matrix/key/v2/server",
+ ignore_backoff=True,
+ # we only give the remote server 10s to respond. It should be an
+ # easy request to handle, so if it doesn't reply within 10s, it's
+ # probably not going to.
+ #
+ # Furthermore, when we are acting as a notary server, we cannot
+ # wait all day for all of the origin servers, as the requesting
+ # server will otherwise time out before we can respond.
+ #
+ # (Note that get_json may make 4 attempts, so this can still take
+ # almost 45 seconds to fetch the headers, plus up to another 60s to
+ # read the response).
+ timeout=10000,
)
- await self.store.store_server_verify_keys(
- server_name,
- time_now_ms,
- ((server_name, key_id, key) for key_id, key in response_keys.items()),
+ except (NotRetryingDestination, RequestSendFailed) as e:
+ # these both have str() representations which we can't really improve
+ # upon
+ raise KeyLookupError(str(e))
+ except HttpResponseException as e:
+ raise KeyLookupError("Remote server returned an error: %s" % (e,))
+
+ assert isinstance(response, dict)
+ if response["server_name"] != server_name:
+ raise KeyLookupError(
+ "Expected a response for server %r not %r"
+ % (server_name, response["server_name"])
)
- keys.update(response_keys)
- return keys
+ return await self.process_v2_response(
+ from_server=server_name,
+ response_json=response,
+ time_added_ms=time_now_ms,
+ )
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index d62906043f..94dd1298e1 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -28,8 +28,8 @@ from synapse.event_auth import auth_types_for_event
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
-from synapse.storage.state import StateFilter
from synapse.types import EventID, JsonDict
+from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.stringutils import random_string
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 1c0e96bec7..6eaef8b57a 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -23,7 +23,7 @@ from synapse.types import JsonDict, StateMap
if TYPE_CHECKING:
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
- from synapse.storage.state import StateFilter
+ from synapse.types.state import StateFilter
@attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c4c0bc7315..137cfb3346 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -771,17 +771,28 @@ class FederationClient(FederationBase):
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
- # There is no good way to detect an "unknown" endpoint.
+ # MSC3743 specifies that servers should return a 404 or 405 with an errcode
+ # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
+ # to an unknown method, respectively.
#
- # Dendrite returns a 404 (with a body of "404 page not found");
- # Conduit returns a 404 (with no body); and Synapse returns a 400
- # with M_UNRECOGNIZED.
- #
- # This needs to be rather specific as some endpoints truly do return 404
- # errors.
+ # Older versions of servers don't properly handle this. This needs to be
+ # rather specific as some endpoints truly do return 404 errors.
return (
- e.code == 404 and (not e.response or e.response == b"404 page not found")
- ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
+ # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
+ (e.code == 404 or e.code == 405)
+ and (
+ # Older Dendrites returned a text or empty body.
+ # Older Conduit returned an empty body.
+ not e.response
+ or e.response == b"404 page not found"
+ # The proper response JSON with M_UNRECOGNIZED errcode.
+ or synapse_error.errcode == Codes.UNRECOGNIZED
+ )
+ ) or (
+ # Older Synapses returned a 400 error.
+ e.code == 400
+ and synapse_error.errcode == Codes.UNRECOGNIZED
+ )
async def _try_destination_list(
self,
@@ -1691,9 +1702,19 @@ class FederationClient(FederationBase):
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
_timestamp_to_event_from_destination,
+ # Since this endpoint is new, we should try other servers before giving up.
+ # We can safely remove this in a year (remove after 2023-11-16).
+ failover_on_unknown_endpoint=True,
)
return timestamp_to_event_response
- except SynapseError:
+ except SynapseError as e:
+ logger.warn(
+ "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s",
+ room_id,
+ timestamp,
+ direction,
+ e,
+ )
return None
async def _timestamp_to_event_from_destination(
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+ domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3ae5e8634c..ffc9d95ee7 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server
@@ -136,8 +136,11 @@ class PerDestinationQueue:
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}
- # room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
+ # List of room_id -> receipt_type -> user_id -> receipt_dict,
+ #
+ # Each receipt can only have a single receipt per
+ # (room ID, receipt type, user ID, thread ID) tuple.
+ self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ class PerDestinationQueue:
Args:
receipt: receipt to be queued
"""
- self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
- receipt.receipt_type, {}
- )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
+ serialized_receipt: JsonDict = {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ }
+ if receipt.thread_id is not None:
+ serialized_receipt["data"]["thread_id"] = receipt.thread_id
+
+ # Find which EDU to add this receipt to. There's three situations depending
+ # on the (room ID, receipt type, user, thread ID) tuple:
+ #
+ # 1. If it fully matches, clobber the information.
+ # 2. If it is missing, add the information.
+ # 3. If the subset tuple of (room ID, receipt type, user) matches, check
+ # the next EDU (or add a new EDU).
+ for edu in self._pending_receipt_edus:
+ receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
+ receipt.receipt_type, {}
+ )
+ # If this room ID, receipt type, user ID is not in this EDU, OR if
+ # the full tuple matches, use the current EDU.
+ if (
+ receipt.user_id not in receipt_content
+ or receipt_content[receipt.user_id].get("thread_id")
+ == receipt.thread_id
+ ):
+ receipt_content[receipt.user_id] = serialized_receipt
+ break
+
+ # If no matching EDU was found, create a new one.
+ else:
+ self._pending_receipt_edus.append(
+ {
+ receipt.room_id: {
+ receipt.receipt_type: {receipt.user_id: serialized_receipt}
+ }
+ }
+ )
def flush_read_receipts_for_room(self, room_id: str) -> None:
- # if we don't have any read-receipts for this room, it may be that we've already
- # sent them out, so we don't need to flush.
- if room_id not in self._pending_rrs:
- return
- self._rrs_pending_flush = True
- self.attempt_new_transaction()
+ # If there are any pending receipts for this room then force-flush them
+ # in a new transaction.
+ for edu in self._pending_receipt_edus:
+ if room_id in edu:
+ self._rrs_pending_flush = True
+ self.attempt_new_transaction()
+ # No use in checking remaining EDUs if the room was found.
+ break
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ class PerDestinationQueue:
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
- self._pending_rrs = {}
+ self._pending_receipt_edus = []
self._start_catching_up()
except FederationDeniedError as e:
@@ -543,22 +582,27 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering
)
- def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
- if not self._pending_rrs:
+ def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+ if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
- edu = Edu(
- origin=self._server_name,
- destination=self._destination,
- edu_type=EduTypes.RECEIPT,
- content=self._pending_rrs,
- )
- self._pending_rrs = {}
- self._rrs_pending_flush = False
- yield edu
+ # Send at most limit EDUs for receipts.
+ for content in self._pending_receipt_edus[:limit]:
+ yield Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type=EduTypes.RECEIPT,
+ content=content,
+ )
+ self._pending_receipt_edus = self._pending_receipt_edus[limit:]
+
+ # If there are still pending read-receipts, don't reset the pending flush
+ # flag.
+ if not self._pending_receipt_edus:
+ self._rrs_pending_flush = False
def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
@@ -597,7 +641,7 @@ class PerDestinationQueue:
if not message_id:
continue
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
edus = [
Edu(
@@ -645,27 +689,61 @@ class _TransactionQueueManager:
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.
- # We start by fetching device related EDUs, i.e device updates and to
- # device messages. We have to keep 2 free slots for presence and rr_edus.
- device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+ # There's a maximum number of EDUs that can be sent with a transaction,
+ # generally device updates and to-device messages get priority, but we
+ # want to ensure that there's room for some other EDUs as well.
+ #
+ # This is done by:
+ #
+ # * Add a presence EDU, if one exists.
+ # * Add up-to a small limit of read receipt EDUs.
+ # * Add to-device EDUs, but leave some space for device list updates.
+ # * Add device list updates EDUs.
+ # * If there's any remaining room, add other EDUs.
+ pending_edus = []
+
+ # Add presence EDU.
+ if self.queue._pending_presence:
+ pending_edus.append(
+ Edu(
+ origin=self.queue._server_name,
+ destination=self.queue._destination,
+ edu_type=EduTypes.PRESENCE,
+ content={
+ "push": [
+ format_user_presence_state(
+ presence, self.queue._clock.time_msec()
+ )
+ for presence in self.queue._pending_presence.values()
+ ]
+ },
+ )
+ )
+ self.queue._pending_presence = {}
- # We prioritize to-device messages so that existing encryption channels
+ # Add read receipt EDUs.
+ pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+ edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
+
+ # Next, prioritize to-device messages so that existing encryption channels
# work. We also keep a few slots spare (by reducing the limit) so that
# we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
- ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
+ ) = await self.queue._get_to_device_message_edus(edu_limit - 10)
if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id
- device_edu_limit -= len(to_device_edus)
+ pending_edus.extend(to_device_edus)
+ edu_limit -= len(to_device_edus)
+ # Add device list update EDUs.
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
- device_edu_limit
+ edu_limit
)
if device_update_edus:
@@ -673,40 +751,17 @@ class _TransactionQueueManager:
else:
self.queue._last_device_list_stream_id = dev_list_id
- pending_edus = device_update_edus + to_device_edus
-
- # Now add the read receipt EDU.
- pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
-
- # And presence EDU.
- if self.queue._pending_presence:
- pending_edus.append(
- Edu(
- origin=self.queue._server_name,
- destination=self.queue._destination,
- edu_type=EduTypes.PRESENCE,
- content={
- "push": [
- format_user_presence_state(
- presence, self.queue._clock.time_msec()
- )
- for presence in self.queue._pending_presence.values()
- ]
- },
- )
- )
- self.queue._pending_presence = {}
+ pending_edus.extend(device_update_edus)
+ edu_limit -= len(device_update_edus)
# Finally add any other types of EDUs if there is room.
- pending_edus.extend(
- self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
- )
- while (
- len(pending_edus) < MAX_EDUS_PER_TRANSACTION
- and self.queue._pending_edus_keyed
- ):
+ other_edus = self.queue._pop_pending_edus(edu_limit)
+ pending_edus.extend(other_edus)
+ edu_limit -= len(other_edus)
+ while edu_limit > 0 and self.queue._pending_edus_keyed:
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)
+ edu_limit -= 1
# Now we look for any PDUs to send, by getting up to 50 PDUs from the
# queue
@@ -717,8 +772,10 @@ class _TransactionQueueManager:
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
- if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
- pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
+ if edu_limit:
+ pending_edus.extend(
+ self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
+ )
if self._pdus:
self._last_stream_ordering = self._pdus[
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index a3cfc701cd..77f1f39cac 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -185,9 +185,8 @@ class TransportLayerClient:
Raises:
Various exceptions when the request fails
"""
- path = _create_path(
- FEDERATION_UNSTABLE_PREFIX,
- "/org.matrix.msc3030/timestamp_to_event/%s",
+ path = _create_v1_path(
+ "/timestamp_to_event/%s",
room_id,
)
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 50623cd385..2725f53cf6 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import (
from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationAccountStatusServlet,
- FederationTimestampLookupServlet,
)
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
@@ -291,13 +290,6 @@ def register_servlets(
)
for servletclass in SERVLET_GROUPS[servlet_group]:
- # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
- if (
- servletclass == FederationTimestampLookupServlet
- and not hs.config.experimental.msc3030_enabled
- ):
- continue
-
# Only allow the `/account_status` servlet if msc3720 is enabled
if (
servletclass == FederationAccountStatusServlet
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 205fd16daa..53e77b4bb6 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet):
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
- GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
+ GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""
PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
- PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
async def on_GET(
self,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 66f5b8d108..5d1d21cdc8 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -578,9 +578,6 @@ class ApplicationServicesHandler:
device_id,
), messages in recipient_device_to_messages.items():
for message_json in messages:
- # Remove 'message_id' from the to-device message, as it's an internal ID
- message_json.pop("message_id", None)
-
message_payload.append(
{
"to_user_id": user_id,
@@ -615,8 +612,8 @@ class ApplicationServicesHandler:
)
# Fetch the users who have modified their device list since then.
- users_with_changed_device_lists = (
- await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
+ users_with_changed_device_lists = await self.store.get_all_devices_changed(
+ from_key, to_key=new_key
)
# Filter out any users the application service is not interested in
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b1e55e1b9e..d4750a32e6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
- # TODO(faster joins): this fetches and processes a bunch of data that we don't
+ # TODO(faster_joins): this fetches and processes a bunch of data that we don't
# use. Could be replaced by a tighter query e.g.
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
partial_rooms = await self.store.get_partial_state_room_resync_info()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..75e89850f5 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
-from synapse.api.constants import EduTypes, ToDeviceEventTypes
+from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
@@ -216,14 +216,24 @@ class DeviceMessageHandler:
"""
sender_user_id = requester.user.to_string()
- message_id = random_string(16)
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
-
- log_kv({"number_of_to_device_messages": len(messages)})
- set_tag("sender", sender_user_id)
+ set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
+ set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
+ # add an opentracing log entry for each message
+ for device_id, message_content in by_device.items():
+ log_kv(
+ {
+ "event": "send_to_device_message",
+ "user_id": user_id,
+ "device_id": device_id,
+ EventContentFields.TO_DEVICE_MSGID: message_content.get(
+ EventContentFields.TO_DEVICE_MSGID
+ ),
+ }
+ )
+
# Ratelimit local cross-user key requests by the sending device.
if (
message_type == ToDeviceEventTypes.RoomKeyRequest
@@ -233,6 +243,7 @@ class DeviceMessageHandler:
requester, (sender_user_id, requester.device_id)
)
if not allowed:
+ log_kv({"message": f"dropping key requests to {user_id}"})
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
@@ -247,18 +258,11 @@ class DeviceMessageHandler:
"content": message_content,
"type": message_type,
"sender": sender_user_id,
- "message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
- log_kv(
- {
- "user_id": user_id,
- "device_id": list(messages_by_device),
- }
- )
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
@@ -267,7 +271,11 @@ class DeviceMessageHandler:
remote_edu_contents = {}
for destination, messages in remote_messages.items():
- log_kv({"destination": destination})
+ # The EDU contains a "message_id" property which is used for
+ # idempotence. Make up a random one.
+ message_id = random_string(16)
+ log_kv({"destination": destination, "message_id": message_id})
+
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d92582fd5c..b2784d7333 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -70,8 +70,8 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import JsonDict, get_domain_from_id
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -152,6 +152,7 @@ class FederationHandler:
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
+ self._notifier = hs.get_notifier()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -1692,6 +1693,9 @@ class FederationHandler:
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
+ # Poke the notifier so that other workers see the write to
+ # the un-partial-stated rooms stream.
+ self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f7223b03c3..d2facdab60 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -75,7 +75,6 @@ from synapse.replication.http.federation import (
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@@ -83,6 +82,7 @@ from synapse.types import (
UserID,
get_domain_from_id,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4cf593cfdc..d6e90ef259 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,7 +59,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@@ -70,6 +69,7 @@ from synapse.types import (
UserID,
create_requester,
)
+from synapse.types.state import StateFilter
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util.caches.expiringcache import ExpiringCache
@@ -1135,11 +1135,13 @@ class EventCreationHandler:
)
state_events = await self.store.get_events_as_list(state_event_ids)
# Create a StateMap[str]
- state_map = {(e.type, e.state_key): e.event_id for e in state_events}
+ current_state_ids = {
+ (e.type, e.state_key): e.event_id for e in state_events
+ }
# Actually strip down and only use the necessary auth events
auth_event_ids = self._event_auth_handler.compute_auth_events(
event=temp_event,
- current_state_ids=state_map,
+ current_state_ids=current_state_ids,
for_verification=False,
)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index c572508a02..8c8ff18a1a 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,9 +27,9 @@ from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf08737d11..2af90b25a3 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
if from_key is not None:
# First get all users that have had a presence update
- updated_users = stream_change_cache.get_all_entities_changed(from_key)
+ result = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
- if updated_users is not None:
+ if result.hit:
+ updated_users = result.entities
+
# If we have the full list of changes for presence we can
# simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
@@ -1764,14 +1766,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
Returns:
A list of presence states for the given user to receive.
"""
+ updated_users = None
if from_key:
# Only return updates since the last sync
- updated_users = self.store.presence_stream_cache.get_all_entities_changed(
- from_key
- )
- if not updated_users:
- updated_users = []
+ result = self.store.presence_stream_cache.get_all_entities_changed(from_key)
+ if result.hit:
+ updated_users = result.entities
+ if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index ac01582442..6a4fed1156 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -92,7 +92,6 @@ class ReceiptsHandler:
continue
# Check if these receipts apply to a thread.
- thread_id = None
data = user_values.get("data", {})
thread_id = data.get("thread_id")
# If the thread ID is invalid, consider it missing.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 6307fa9c5d..c611efb760 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -46,8 +46,8 @@ from synapse.replication.http.register import (
ReplicationRegisterServlet,
)
from synapse.spam_checker_api import RegistrationBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID, create_requester
+from synapse.types.state import StateFilter
if TYPE_CHECKING:
from synapse.server import HomeServer
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6dcfd86fdf..f81241c2b3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -62,7 +62,6 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.state import StateFilter
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -77,6 +76,7 @@ from synapse.types import (
UserID,
create_requester,
)
+from synapse.types.state import StateFilter
from synapse.util import stringutils
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_and_validate_server_name
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6ad2b38b8f..0c39e852a1 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -34,7 +34,6 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
-from synapse.storage.state import StateFilter
from synapse.types import (
JsonDict,
Requester,
@@ -45,6 +44,7 @@ from synapse.types import (
create_requester,
get_domain_from_id,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_left_room
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index bcab98c6d5..33115ce488 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,8 +23,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events import EventBase
-from synapse.storage.state import StateFilter
from synapse.types import JsonDict, StreamKeyType, UserID
+from synapse.types.state import StateFilter
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 259456b55d..7d6a653747 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -31,19 +31,24 @@ from typing import (
import attr
from prometheus_client import Counter
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ trace,
+)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
-from synapse.storage.state import StateFilter
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -55,6 +60,7 @@ from synapse.types import (
StreamToken,
UserID,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
@@ -1426,14 +1432,14 @@ class SyncHandler:
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
- one_time_key_counts: JsonDict = {}
+ one_time_keys_count: JsonDict = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
- one_time_key_counts = await self.store.count_e2e_one_time_keys(
+ one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = (
@@ -1463,7 +1469,7 @@ class SyncHandler:
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
- device_one_time_keys_count=one_time_key_counts,
+ device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
@@ -1528,10 +1534,12 @@ class SyncHandler:
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
- changed_users = self.store.get_cached_device_list_changes(
+ cache_result = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
- if changed_users is not None:
+ if cache_result.hit:
+ changed_users = cache_result.entities
+
result = await self.store.get_rooms_for_users(changed_users)
for changed_user_id, entries in result.items():
@@ -1584,6 +1592,7 @@ class SyncHandler:
else:
return DeviceListUpdates()
+ @trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
@@ -1603,11 +1612,16 @@ class SyncHandler:
)
for message in messages:
- # We pop here as we shouldn't be sending the message ID down
- # `/sync`
- message_id = message.pop("message_id", None)
- if message_id:
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ log_kv(
+ {
+ "event": "to_device_message",
+ "sender": message["sender"],
+ "type": message["type"],
+ EventContentFields.TO_DEVICE_MSGID: message["content"].get(
+ EventContentFields.TO_DEVICE_MSGID
+ ),
+ }
+ )
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a0ea719430..3f656ea4f5 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler):
if last_id == current_id:
return [], current_id, False
- changed_rooms: Optional[
- Iterable[str]
- ] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
+ result = self._typing_stream_change_cache.get_all_entities_changed(last_id)
- if changed_rooms is None:
+ if result.hit:
+ changed_rooms: Iterable[str] = result.entities
+ else:
changed_rooms = self._room_serials
rows = []
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 051a1899a0..2563858f3c 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -577,7 +577,24 @@ def _unrecognised_request_handler(request: Request) -> NoReturn:
Args:
request: Unused, but passed in to match the signature of ServletCallback.
"""
- raise UnrecognizedRequestError()
+ raise UnrecognizedRequestError(code=404)
+
+
+class UnrecognizedRequestResource(resource.Resource):
+ """
+ Similar to twisted.web.resource.NoResource, but returns a JSON 404 with an
+ errcode of M_UNRECOGNIZED.
+ """
+
+ def render(self, request: SynapseRequest) -> int:
+ f = failure.Failure(UnrecognizedRequestError(code=404))
+ return_json_error(f, request, None)
+ # A response has already been sent but Twisted requires either NOT_DONE_YET
+ # or the response bytes as a return value.
+ return NOT_DONE_YET
+
+ def getChild(self, name: str, request: Request) -> resource.Resource:
+ return self
class RootRedirect(resource.Resource):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index b69060854f..a705af8356 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -292,8 +292,15 @@ logger = logging.getLogger(__name__)
class SynapseTags:
- # The message ID of any to_device message processed
- TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+ # The message ID of any to_device EDU processed
+ TO_DEVICE_EDU_ID = "to_device.edu_id"
+
+ # Details about to-device messages
+ TO_DEVICE_TYPE = "to_device.type"
+ TO_DEVICE_SENDER = "to_device.sender"
+ TO_DEVICE_RECIPIENT = "to_device.recipient"
+ TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
+ TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID
# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py
index 0a22ea3d92..6e05b043d3 100644
--- a/synapse/metrics/common_usage_metrics.py
+++ b/synapse/metrics/common_usage_metrics.py
@@ -54,7 +54,9 @@ class CommonUsageMetricsManager:
async def setup(self) -> None:
"""Keep the gauges for common usage metrics up to date."""
- await self._update_gauges()
+ run_as_background_process(
+ desc="common_usage_metrics_update_gauges", func=self._update_gauges
+ )
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 38c71b8b43..6da9e1d8b2 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -112,7 +112,6 @@ from synapse.storage.background_updates import (
)
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
-from synapse.storage.state import StateFilter
from synapse.types import (
DomainSpecificString,
JsonDict,
@@ -125,6 +124,7 @@ from synapse.types import (
UserProfile,
create_requester,
)
+from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.async_helpers import maybe_awaitable
from synapse.util.caches.descriptors import CachedFunction, cached
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 75b7e126ca..36e5b327ef 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -29,13 +29,14 @@ from typing import (
from prometheus_client import Counter
from synapse.api.constants import MAIN_TIMELINE, EventTypes, Membership, RelationTypes
+from synapse.api.room_versions import PushRuleRoomFlag, RoomVersion
from synapse.event_auth import auth_types_for_event, get_user_power_level
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
-from synapse.storage.state import StateFilter
from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
+from synapse.types.state import StateFilter
from synapse.util.caches import register_cache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_event_for_clients_with_state
@@ -105,6 +106,7 @@ class BulkPushRuleEvaluator:
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self._event_auth_handler = hs.get_event_auth_handler()
+ self.should_calculate_push_rules = self.hs.config.push.enable_push
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
@@ -268,6 +270,8 @@ class BulkPushRuleEvaluator:
for each event, check if the message should increment the unread count, and
insert the results into the event_push_actions_staging table.
"""
+ if not self.should_calculate_push_rules:
+ return
# For batched events the power level events may not have been persisted yet,
# so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
@@ -338,13 +342,19 @@ class BulkPushRuleEvaluator:
for user_id, level in notification_levels.items():
notification_levels[user_id] = int(level)
+ room_version_features = event.room_version.msc3931_push_features
+ if not room_version_features:
+ room_version_features = []
+
evaluator = PushRuleEvaluator(
- _flatten_dict(event),
+ _flatten_dict(event, room_version=event.room_version),
room_member_count,
sender_power_level,
notification_levels,
related_events,
self._related_event_match_enabled,
+ room_version_features,
+ self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
)
users = rules_by_user.keys()
@@ -420,6 +430,7 @@ StateGroup = Union[object, int]
def _flatten_dict(
d: Union[EventBase, Mapping[str, Any]],
+ room_version: Optional[RoomVersion] = None,
prefix: Optional[List[str]] = None,
result: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
@@ -431,6 +442,31 @@ def _flatten_dict(
if isinstance(value, str):
result[".".join(prefix + [key])] = value.lower()
elif isinstance(value, Mapping):
+ # do not set `room_version` due to recursion considerations below
_flatten_dict(value, prefix=(prefix + [key]), result=result)
+ # `room_version` should only ever be set when looking at the top level of an event
+ if (
+ room_version is not None
+ and PushRuleRoomFlag.EXTENSIBLE_EVENTS in room_version.msc3931_push_features
+ and isinstance(d, EventBase)
+ ):
+ # Room supports extensible events: replace `content.body` with the plain text
+ # representation from `m.markup`, as per MSC1767.
+ markup = d.get("content").get("m.markup")
+ if room_version.identifier.startswith("org.matrix.msc1767."):
+ markup = d.get("content").get("org.matrix.msc1767.markup")
+ if markup is not None and isinstance(markup, list):
+ text = ""
+ for rep in markup:
+ if not isinstance(rep, dict):
+ # invalid markup - skip all processing
+ break
+ if rep.get("mimetype", "text/plain") == "text/plain":
+ rep_text = rep.get("body")
+ if rep_text is not None and isinstance(rep_text, str):
+ text = rep_text.lower()
+ break
+ result["content.body"] = text
+
return result
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index c2575ba3d9..93b255ced5 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -37,8 +37,8 @@ from synapse.push.push_types import (
TemplateVars,
)
from synapse.storage.databases.main.event_push_actions import EmailPushAction
-from synapse.storage.state import StateFilter
from synapse.types import StateMap, UserID
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index edeba27a45..7ee07e4bee 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,7 +17,6 @@ from synapse.events import EventBase
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
-from synapse.util.async_helpers import concurrently_execute
async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
badge = len(invites)
- room_notifs = []
-
- async def get_room_unread_count(room_id: str) -> None:
- room_notifs.append(
- await store.get_unread_event_push_actions_by_room_for_user(
- room_id,
- user_id,
- )
- )
-
- await concurrently_execute(get_room_unread_count, joins, 10)
-
- for notifs in room_notifs:
- # Combine the counts from all the threads.
- notify_count = notifs.main_timeline.notify_count + sum(
- n.notify_count for n in notifs.threads.values()
- )
+ room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
+ for room_id, notify_count in room_to_count.items():
+ # room_to_count may include rooms which the user has left,
+ # ignore those.
+ if room_id not in joins:
+ continue
if notify_count == 0:
continue
@@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
# return one badge count per conversation
badge += 1
else:
- # increment the badge count by the number of unread messages in the room
+ # Increase badge by number of notifications in room
+ # NOTE: this includes threaded and unthreaded notifications.
badge += notify_count
+
return badge
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 18252a2958..b4dad47b45 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -36,12 +36,14 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream,
ToDeviceStream,
TypingStream,
+ UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
@@ -117,6 +119,7 @@ class ReplicationDataHandler:
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
+ self._state_storage_controller = hs.get_storage_controllers().state
self._notify_pushers = hs.config.worker.start_pushers
self._pusher_pool = hs.get_pusherpool()
@@ -236,6 +239,14 @@ class ReplicationDataHandler:
self.notifier.notify_user_joined_room(
row.data.event_id, row.data.room_id
)
+ elif stream_name == UnPartialStatedRoomStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedRoomStreamRow)
+
+ # Wake up any tasks waiting for the room to be un-partial-stated.
+ self._state_storage_controller.notify_room_un_partial_stated(
+ row.room_id
+ )
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
STREAMS_MAP = {
stream.NAME: stream
@@ -61,6 +62,7 @@ STREAMS_MAP = {
TagAccountDataStream,
AccountDataStream,
UserSignatureStream,
+ UnPartialStatedRoomStream,
)
}
@@ -80,4 +82,5 @@ __all__ = [
"TagAccountDataStream",
"AccountDataStream",
"UserSignatureStream",
+ "UnPartialStatedRoomStream",
]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
new file mode 100644
index 0000000000..18f087ffa2
--- /dev/null
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+import attr
+
+from synapse.replication.tcp.streams import Stream
+from synapse.replication.tcp.streams._base import current_token_without_instance
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedRoomStreamRow:
+ # ID of the room that has been un-partial-stated.
+ room_id: str
+
+
+class UnPartialStatedRoomStream(Stream):
+ """
+ Stream to notify about rooms becoming un-partial-stated;
+ that is, when the background sync finishes such that we now have full state for
+ the room.
+ """
+
+ NAME = "un_partial_stated_room"
+ ROW_TYPE = UnPartialStatedRoomStreamRow
+
+ def __init__(self, hs: "HomeServer"):
+ store = hs.get_datastores().main
+ super().__init__(
+ hs.get_instance_name(),
+ # TODO(faster_joins, multiple writers): we need to account for instance names
+ current_token_without_instance(store.get_un_partial_stated_rooms_token),
+ store.get_un_partial_stated_rooms_from_stream,
+ )
diff --git a/synapse/res/templates/_base.html b/synapse/res/templates/_base.html
index 46439fce6a..4b5cc7bcb6 100644
--- a/synapse/res/templates/_base.html
+++ b/synapse/res/templates/_base.html
@@ -13,13 +13,13 @@
<body>
<header class="mx_Header">
{% if app_name == "Riot" %}
- <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
+ <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{% elif app_name == "Vector" %}
- <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
+ <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{% elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{% else %}
- <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
+ <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{% endif %}
</header>
diff --git a/synapse/res/templates/notice_expiry.html b/synapse/res/templates/notice_expiry.html
index 406397aaca..f62038e111 100644
--- a/synapse/res/templates/notice_expiry.html
+++ b/synapse/res/templates/notice_expiry.html
@@ -21,13 +21,13 @@
</td>
<td class="logo">
{% if app_name == "Riot" %}
- <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
+ <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{% elif app_name == "Vector" %}
- <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
+ <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{% elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{% else %}
- <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
+ <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{% endif %}
</td>
</tr>
diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html
index 2add9dd859..7da0fff5e9 100644
--- a/synapse/res/templates/notif_mail.html
+++ b/synapse/res/templates/notif_mail.html
@@ -22,13 +22,13 @@
</td>
<td class="logo">
{%- if app_name == "Riot" %}
- <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
+ <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{%- elif app_name == "Vector" %}
- <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
+ <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{%- elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{%- else %}
- <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
+ <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{%- endif %}
</td>
</tr>
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 28542cd774..14c4e6ebbb 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -29,7 +29,7 @@ from synapse.rest.client import (
initial_sync,
keys,
knock,
- login as v1_login,
+ login,
login_token_request,
logout,
mutual_rooms,
@@ -82,6 +82,10 @@ class ClientRestResource(JsonResource):
@staticmethod
def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
+ # Some servlets are only registered on the main process (and not worker
+ # processes).
+ is_main_process = hs.config.worker.worker_app is None
+
versions.register_servlets(hs, client_resource)
# Deprecated in r0
@@ -92,45 +96,58 @@ class ClientRestResource(JsonResource):
events.register_servlets(hs, client_resource)
room.register_servlets(hs, client_resource)
- v1_login.register_servlets(hs, client_resource)
+ login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
- directory.register_servlets(hs, client_resource)
+ if is_main_process:
+ directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
- pusher.register_servlets(hs, client_resource)
+ if is_main_process:
+ pusher.register_servlets(hs, client_resource)
push_rule.register_servlets(hs, client_resource)
- logout.register_servlets(hs, client_resource)
+ if is_main_process:
+ logout.register_servlets(hs, client_resource)
sync.register_servlets(hs, client_resource)
- filter.register_servlets(hs, client_resource)
+ if is_main_process:
+ filter.register_servlets(hs, client_resource)
account.register_servlets(hs, client_resource)
register.register_servlets(hs, client_resource)
- auth.register_servlets(hs, client_resource)
+ if is_main_process:
+ auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
room_keys.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
- tokenrefresh.register_servlets(hs, client_resource)
+ if is_main_process:
+ tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
account_data.register_servlets(hs, client_resource)
- report_event.register_servlets(hs, client_resource)
- openid.register_servlets(hs, client_resource)
- notifications.register_servlets(hs, client_resource)
+ if is_main_process:
+ report_event.register_servlets(hs, client_resource)
+ openid.register_servlets(hs, client_resource)
+ notifications.register_servlets(hs, client_resource)
devices.register_servlets(hs, client_resource)
- thirdparty.register_servlets(hs, client_resource)
+ if is_main_process:
+ thirdparty.register_servlets(hs, client_resource)
sendtodevice.register_servlets(hs, client_resource)
user_directory.register_servlets(hs, client_resource)
- room_upgrade_rest_servlet.register_servlets(hs, client_resource)
+ if is_main_process:
+ room_upgrade_rest_servlet.register_servlets(hs, client_resource)
room_batch.register_servlets(hs, client_resource)
- capabilities.register_servlets(hs, client_resource)
- account_validity.register_servlets(hs, client_resource)
+ if is_main_process:
+ capabilities.register_servlets(hs, client_resource)
+ account_validity.register_servlets(hs, client_resource)
relations.register_servlets(hs, client_resource)
- password_policy.register_servlets(hs, client_resource)
- knock.register_servlets(hs, client_resource)
+ if is_main_process:
+ password_policy.register_servlets(hs, client_resource)
+ knock.register_servlets(hs, client_resource)
# moving to /_synapse/admin
- admin.register_servlets_for_client_rest_resource(hs, client_resource)
+ if is_main_process:
+ admin.register_servlets_for_client_rest_resource(hs, client_resource)
# unstable
- mutual_rooms.register_servlets(hs, client_resource)
- login_token_request.register_servlets(hs, client_resource)
- rendezvous.register_servlets(hs, client_resource)
+ if is_main_process:
+ mutual_rooms.register_servlets(hs, client_resource)
+ login_token_request.register_servlets(hs, client_resource)
+ rendezvous.register_servlets(hs, client_resource)
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 747e6fda83..e957aa28ca 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -34,9 +34,9 @@ from synapse.rest.admin._base import (
assert_user_is_admin,
)
from synapse.storage.databases.main.room import RoomSortOrder
-from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
+from synapse.types.state import StateFilter
from synapse.util import json_decoder
if TYPE_CHECKING:
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 44f622bcce..b4b92f0c99 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -875,19 +875,21 @@ class AccountStatusRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- EmailPasswordRequestTokenRestServlet(hs).register(http_server)
- PasswordRestServlet(hs).register(http_server)
- DeactivateAccountRestServlet(hs).register(http_server)
- EmailThreepidRequestTokenRestServlet(hs).register(http_server)
- MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
- AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
- AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+ PasswordRestServlet(hs).register(http_server)
+ DeactivateAccountRestServlet(hs).register(http_server)
+ EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+ MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
+ AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
+ AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
- ThreepidAddRestServlet(hs).register(http_server)
- ThreepidBindRestServlet(hs).register(http_server)
- ThreepidUnbindRestServlet(hs).register(http_server)
- ThreepidDeleteRestServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ ThreepidAddRestServlet(hs).register(http_server)
+ ThreepidBindRestServlet(hs).register(http_server)
+ ThreepidUnbindRestServlet(hs).register(http_server)
+ ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
- if hs.config.experimental.msc3720_enabled:
+ if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
AccountStatusRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 69b803f9f8..486c6dbbc5 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -342,8 +342,10 @@ class ClaimDehydratedDeviceServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- DeleteDevicesRestServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
- DeviceRestServlet(hs).register(http_server)
- DehydratedDeviceServlet(hs).register(http_server)
- ClaimDehydratedDeviceServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ DeviceRestServlet(hs).register(http_server)
+ DehydratedDeviceServlet(hs).register(http_server)
+ ClaimDehydratedDeviceServlet(hs).register(http_server)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index ee038c7192..7873b363c0 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -376,5 +376,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
KeyQueryServlet(hs).register(http_server)
KeyChangesServlet(hs).register(http_server)
OneTimeKeyServlet(hs).register(http_server)
- SigningKeyUploadServlet(hs).register(http_server)
- SignaturesUploadServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ SigningKeyUploadServlet(hs).register(http_server)
+ SignaturesUploadServlet(hs).register(http_server)
diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py
index 18a282b22c..28b7d30ea8 100644
--- a/synapse/rest/client/receipts.py
+++ b/synapse/rest/client/receipts.py
@@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
-from synapse.types import JsonDict
+from synapse.types import EventID, JsonDict, RoomID
from ._base import client_patterns
@@ -56,6 +56,9 @@ class ReceiptRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
+ if not RoomID.is_valid(room_id) or not event_id.startswith(EventID.SIGIL):
+ raise SynapseError(400, "A valid room ID and event ID must be specified")
+
if receipt_type not in self._known_receipt_types:
raise SynapseError(
400,
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index de810ae3ec..3cb1e7e375 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -949,9 +949,10 @@ def _calculate_registration_flows(
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- EmailRegisterRequestTokenRestServlet(hs).register(http_server)
- MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
- UsernameAvailabilityRestServlet(hs).register(http_server)
- RegistrationSubmitTokenServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+ MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
+ UsernameAvailabilityRestServlet(hs).register(http_server)
+ RegistrationSubmitTokenServlet(hs).register(http_server)
RegistrationTokenValidityRestServlet(hs).register(http_server)
RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 91cb791139..790614d721 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -55,9 +55,9 @@ from synapse.logging.opentracing import set_tag
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
-from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID
+from synapse.types.state import StateFilter
from synapse.util import json_decoder
from synapse.util.cancellation import cancellable
from synapse.util.stringutils import parse_and_validate_server_name, random_string
@@ -396,12 +396,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
- try:
- content = parse_json_object_from_request(request)
- except Exception:
- # Turns out we used to ignore the body entirely, and some clients
- # cheekily send invalid bodies.
- content = {}
+ content = parse_json_object_from_request(request, allow_empty_body=True)
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
@@ -952,12 +947,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
}:
raise AuthError(403, "Guest access not allowed")
- try:
- content = parse_json_object_from_request(request)
- except Exception:
- # Turns out we used to ignore the body entirely, and some clients
- # cheekily send invalid bodies.
- content = {}
+ content = parse_json_object_from_request(request, allow_empty_body=True)
if membership_action == "invite" and all(
key in content for key in ("medium", "address")
@@ -1284,17 +1274,14 @@ class TimestampLookupRestServlet(RestServlet):
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
- GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
+ GET /_matrix/client/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""
PATTERNS = (
- re.compile(
- "^/_matrix/client/unstable/org.matrix.msc3030"
- "/rooms/(?P<room_id>[^/]*)/timestamp_to_event$"
- ),
+ re.compile("^/_matrix/client/v1/rooms/(?P<room_id>[^/]*)/timestamp_to_event$"),
)
def __init__(self, hs: "HomeServer"):
@@ -1398,9 +1385,7 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet):
)
-def register_servlets(
- hs: "HomeServer", http_server: HttpServer, is_worker: bool = False
-) -> None:
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
RoomStateEventRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1421,11 +1406,10 @@ def register_servlets(
RoomAliasListServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
RoomCreateRestServlet(hs).register(http_server)
- if hs.config.experimental.msc3030_enabled:
- TimestampLookupRestServlet(hs).register(http_server)
+ TimestampLookupRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
- if not is_worker:
+ if hs.config.worker.worker_app is None:
RoomForgetRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 46a8b03829..55d52f0b28 100644
--- a/synapse/rest/client/sendtodevice.py
+++ b/synapse/rest/client/sendtodevice.py
@@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
diff --git a/synapse/rest/client/user_directory.py b/synapse/rest/client/user_directory.py
index 116c982ce6..4670fad608 100644
--- a/synapse/rest/client/user_directory.py
+++ b/synapse/rest/client/user_directory.py
@@ -63,8 +63,8 @@ class UserDirectorySearchRestServlet(RestServlet):
body = parse_json_object_from_request(request)
- limit = body.get("limit", 10)
- limit = min(limit, 50)
+ limit = int(body.get("limit", 10))
+ limit = max(min(limit, 50), 0)
try:
search_term = body["search_term"]
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 180a11ef88..e19c0946c0 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet):
"v1.2",
"v1.3",
"v1.4",
+ "v1.5",
],
# as per MSC1497:
"unstable_features": {
@@ -101,8 +102,6 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc3827.stable": True,
# Adds support for importing historical messages as per MSC2716
"org.matrix.msc2716": self.config.experimental.msc2716_enabled,
- # Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
- "org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
# Support for thread read receipts & notification counts.
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 40b0d39eb2..c70e1837af 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -24,7 +24,6 @@ from matrix_common.types.mxc_uri import MXCUri
import twisted.internet.error
import twisted.web.http
from twisted.internet.defer import Deferred
-from twisted.web.resource import Resource
from synapse.api.errors import (
FederationDeniedError,
@@ -35,6 +34,7 @@ from synapse.api.errors import (
)
from synapse.config._base import ConfigError
from synapse.config.repository import ThumbnailRequirement
+from synapse.http.server import UnrecognizedRequestResource
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -1046,7 +1046,7 @@ class MediaRepository:
return removed_media, len(removed_media)
-class MediaRepositoryResource(Resource):
+class MediaRepositoryResource(UnrecognizedRequestResource):
"""File uploading and downloading.
Uploads are POSTed to a resource which returns a token which is used to GET
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 833ffec3de..ee5469d5a8 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -44,8 +44,8 @@ from synapse.logging.context import ContextResourceUsage
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import StateMap
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 2056ecb2c3..a99aea8926 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -544,6 +544,48 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index.
"""
+ async def updater(progress: JsonDict, batch_size: int) -> int:
+ await self.create_index_in_background(
+ index_name=index_name,
+ table=table,
+ columns=columns,
+ where_clause=where_clause,
+ unique=unique,
+ psql_only=psql_only,
+ replaces_index=replaces_index,
+ )
+ await self._end_background_update(update_name)
+ return 1
+
+ self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
+ updater, oneshot=True
+ )
+
+ async def create_index_in_background(
+ self,
+ index_name: str,
+ table: str,
+ columns: Iterable[str],
+ where_clause: Optional[str] = None,
+ unique: bool = False,
+ psql_only: bool = False,
+ replaces_index: Optional[str] = None,
+ ) -> None:
+ """Add an index in the background.
+
+ Args:
+ update_name: update_name to register for
+ index_name: name of index to add
+ table: table to add index to
+ columns: columns/expressions to include in index
+ where_clause: A WHERE clause to specify a partial unique index.
+ unique: true to make a UNIQUE index
+ psql_only: true to only create this index on psql databases (useful
+ for virtual sqlite tables)
+ replaces_index: The name of an index that this index replaces.
+ The named index will be dropped upon completion of the new index.
+ """
+
def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
@@ -618,16 +660,11 @@ class BackgroundUpdater:
else:
runner = create_index_sqlite
- async def updater(progress: JsonDict, batch_size: int) -> int:
- if runner is not None:
- logger.info("Adding index %s to %s", index_name, table)
- await self.db_pool.runWithConnection(runner)
- await self._end_background_update(update_name)
- return 1
+ if runner is None:
+ return
- self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
- updater, oneshot=True
- )
+ logger.info("Adding index %s to %s", index_name, table)
+ await self.db_pool.runWithConnection(runner)
async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 33ffef521b..f1d2c71c91 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -58,13 +58,13 @@ from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
StateMap,
get_domain_from_id,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
from synapse.util.metrics import Measure
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 2b31ce54bb..26d79c6e62 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -31,12 +31,12 @@ from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
-from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateMap
+from synapse.types.state import StateFilter
from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a14b13aec8..55bcb90001 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1129,7 +1129,6 @@ class DatabasePool:
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
- lock: bool = True,
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
@@ -1154,21 +1153,12 @@ class DatabasePool:
requiring that a unique index exist on the column names used to detect a
conflict (i.e. `keyvalues.keys()`).
- If there is no such index, we can "emulate" an upsert with a SELECT followed
- by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
- atomicity guarantees that a native upsert can and are very vulnerable to races
- and crashes. Therefore if we wish to upsert without an appropriate unique index,
- we must either:
-
- 1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
- 2. VERY CAREFULLY ensure that we are the only thread and worker which will be
- writing to this table, in which case we can proceed without a lock
- (`lock=False`).
-
- Generally speaking, you should use `lock=True`. If the table in question has a
- unique index[*], this class will use a native upsert (which is atomic and so can
- ignore the `lock` argument). Otherwise this class will use an emulated upsert,
- in which case we want the safer option unless we been VERY CAREFUL.
+ If there is no such index yet[*], we can "emulate" an upsert with a SELECT
+ followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters
+ run at the SERIALIZABLE isolation level: we cannot make the same atomicity
+ guarantees that a native upsert can and are very vulnerable to races and
+ crashes. Therefore to upsert without an appropriate unique index, we acquire a
+ table-level lock before the emulated upsert.
[*]: Some tables have unique indices added to them in the background. Those
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
@@ -1189,7 +1179,6 @@ class DatabasePool:
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
desc: description of the transaction, for logging and metrics
- lock: True to lock the table when doing the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
@@ -1209,7 +1198,6 @@ class DatabasePool:
keyvalues,
values,
insertion_values,
- lock=lock,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
@@ -1232,7 +1220,6 @@ class DatabasePool:
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
- lock: bool = True,
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
@@ -1245,8 +1232,6 @@ class DatabasePool:
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
@@ -1270,7 +1255,6 @@ class DatabasePool:
values,
insertion_values=insertion_values,
where_clause=where_clause,
- lock=lock,
)
def simple_upsert_txn_emulated(
@@ -1291,14 +1275,15 @@ class DatabasePool:
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert.
+ Must not be False unless the table has already been locked.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- # We need to lock the table :(, unless we're *really* careful
if lock:
+ # We need to lock the table :(
self.engine.lock_table(txn, table)
def _getwhere(key: str) -> str:
@@ -1406,7 +1391,6 @@ class DatabasePool:
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
- lock: bool = True,
) -> None:
"""
Upsert, many times.
@@ -1418,8 +1402,6 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
"""
# We can autocommit if it safe to upsert
@@ -1433,7 +1415,6 @@ class DatabasePool:
key_values,
value_names,
value_values,
- lock=lock,
db_autocommit=autocommit,
)
@@ -1445,7 +1426,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
- lock: bool = True,
) -> None:
"""
Upsert, many times.
@@ -1457,8 +1437,6 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
"""
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
@@ -1466,7 +1444,12 @@ class DatabasePool:
)
else:
return self.simple_upsert_many_txn_emulated(
- txn, table, key_names, key_values, value_names, value_values, lock=lock
+ txn,
+ table,
+ key_names,
+ key_values,
+ value_names,
+ value_values,
)
def simple_upsert_many_txn_emulated(
@@ -1477,7 +1460,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
- lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
@@ -1489,18 +1471,16 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]
- if lock:
- # Lock the table just once, to prevent it being done once per row.
- # Note that, according to Postgres' documentation, once obtained,
- # the lock is held for the remainder of the current transaction.
- self.engine.lock_table(txn, "user_ips")
+ # Lock the table just once, to prevent it being done once per row.
+ # Note that, according to Postgres' documentation, once obtained,
+ # the lock is held for the remainder of the current transaction.
+ self.engine.lock_table(txn, "user_ips")
for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 282687ebce..07908c41d9 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -449,9 +449,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id:
- # no need to lock here as room_account_data has a unique constraint
- # on (user_id, room_id, account_data_type) so simple_upsert will
- # retry if there is a conflict.
await self.db_pool.simple_upsert(
desc="add_room_account_data",
table="room_account_data",
@@ -461,7 +458,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"account_data_type": account_data_type,
},
values={"stream_id": next_id, "content": content_json},
- lock=False,
)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@@ -517,15 +513,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) -> None:
content_json = json_encoder.encode(content)
- # no need to lock here as account_data has a unique constraint on
- # (user_id, account_data_type) so simple_upsert will retry if
- # there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
- lock=False,
)
# Ignored users get denormalized into a separate table as an optimisation.
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 63046c0527..c2c8018ee2 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -20,7 +20,7 @@ from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
AppServiceTransaction,
- TransactionOneTimeKeyCounts,
+ TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.config.appservice import load_appservices
@@ -260,7 +260,7 @@ class ApplicationServiceTransactionWorkerStore(
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
- one_time_key_counts: TransactionOneTimeKeyCounts,
+ one_time_keys_count: TransactionOneTimeKeysCount,
unused_fallback_keys: TransactionUnusedFallbackKeys,
device_list_summary: DeviceListUpdates,
) -> AppServiceTransaction:
@@ -273,7 +273,7 @@ class ApplicationServiceTransactionWorkerStore(
events: A list of persistent events to put in the transaction.
ephemeral: A list of ephemeral events to put in the transaction.
to_device_messages: A list of to-device messages to put in the transaction.
- one_time_key_counts: Counts of remaining one-time keys for relevant
+ one_time_keys_count: Counts of remaining one-time keys for relevant
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
@@ -299,7 +299,7 @@ class ApplicationServiceTransactionWorkerStore(
events=events,
ephemeral=ephemeral,
to_device_messages=to_device_messages,
- one_time_key_counts=one_time_key_counts,
+ one_time_keys_count=one_time_keys_count,
unused_fallback_keys=unused_fallback_keys,
device_list_summary=device_list_summary,
)
@@ -379,7 +379,7 @@ class ApplicationServiceTransactionWorkerStore(
events=events,
ephemeral=[],
to_device_messages=[],
- one_time_key_counts={},
+ one_time_keys_count={},
unused_fallback_keys={},
device_list_summary=DeviceListUpdates(),
)
@@ -451,8 +451,6 @@ class ApplicationServiceTransactionWorkerStore(
table="application_services_state",
keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos},
- # no need to lock when emulating upsert: as_id is a unique key
- lock=False,
desc="set_appservice_stream_type_pos",
)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..48a54d9cb8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -26,8 +26,15 @@ from typing import (
cast,
)
+from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ trace,
+)
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(recipient_user_id, recipient_device_id), []
).append(message_dict)
+ # start a new span for each message, so that we can tag each separately
+ with start_active_span("get_to_device_message"):
+ set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
+ set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
+ )
+
if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
- if remote_messages_by_destination:
- issue9533_logger.debug(
- "Queued outgoing to-device messages with stream_id %i for %s",
- stream_id,
- list(remote_messages_by_destination.keys()),
- )
+ for destination, edu in remote_messages_by_destination.items():
+ if issue9533_logger.isEnabledFor(logging.DEBUG):
+ issue9533_logger.debug(
+ "Queued outgoing to-device messages with "
+ "stream_id %i, EDU message_id %s, type %s for %s: %s",
+ stream_id,
+ edu["message_id"],
+ edu["type"],
+ destination,
+ [
+ f"{user_id}/{device_id} (msgid "
+ f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
+ for (user_id, messages_by_device) in edu["messages"].items()
+ for (device_id, msg) in messages_by_device.items()
+ ],
+ )
+
+ for (user_id, messages_by_device) in edu["messages"].items():
+ for (device_id, msg) in messages_by_device.items():
+ with start_active_span("store_outgoing_to_device_message"):
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
+ set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ msg.get(EventContentFields.TO_DEVICE_MSGID),
+ )
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
@@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Only insert into the local inbox if the device exists on
# this server
device_id = row["device_id"]
- message_json = json_encoder.encode(messages_by_device[device_id])
+
+ with start_active_span("serialise_to_device_message"):
+ msg = messages_by_device[device_id]
+ set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
+ set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
+ )
+ message_json = json_encoder.encode(msg)
+
messages_json_for_user[device_id] = message_json
if messages_json_for_user:
@@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
- issue9533_logger.debug(
- "Stored to-device messages with stream_id %i for %s",
- stream_id,
- [
- (user_id, device_id)
- for (user_id, messages_by_device) in local_by_user_then_device.items()
- for device_id in messages_by_device.keys()
- ],
- )
+ if issue9533_logger.isEnabledFor(logging.DEBUG):
+ issue9533_logger.debug(
+ "Stored to-device messages with stream_id %i: %s",
+ stream_id,
+ [
+ f"{user_id}/{device_id} (msgid "
+ f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
+ for (
+ user_id,
+ messages_by_device,
+ ) in messages_by_user_then_device.items()
+ for (device_id, msg) in messages_by_device.items()
+ ],
+ )
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 05a193f889..a5bb4d404e 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.caches.stream_change_cache import (
+ AllEntitiesChangedResult,
+ StreamChangeCache,
+)
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -799,7 +802,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_cached_device_list_changes(
self,
from_key: int,
- ) -> Optional[List[str]]:
+ ) -> AllEntitiesChangedResult:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""
@@ -807,10 +810,58 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return self._device_list_stream_cache.get_all_entities_changed(from_key)
@cancellable
+ async def get_all_devices_changed(
+ self,
+ from_key: int,
+ to_key: int,
+ ) -> Set[str]:
+ """Get all users whose devices have changed in the given range.
+
+ Args:
+ from_key: The minimum device lists stream token to query device list
+ changes for, exclusive.
+ to_key: The maximum device lists stream token to query device list
+ changes for, inclusive.
+
+ Returns:
+ The set of user_ids whose devices have changed since `from_key`
+ (exclusive) until `to_key` (inclusive).
+ """
+
+ result = self._device_list_stream_cache.get_all_entities_changed(from_key)
+
+ if result.hit:
+ # We know which users might have changed devices.
+ if not result.entities:
+ # If no users then we can return early.
+ return set()
+
+ # Otherwise we need to filter down the list
+ return await self.get_users_whose_devices_changed(
+ from_key, result.entities, to_key
+ )
+
+ # If the cache didn't tell us anything, we just need to query the full
+ # range.
+ sql = """
+ SELECT DISTINCT user_id FROM device_lists_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ """
+
+ rows = await self.db_pool.execute(
+ "get_all_devices_changed",
+ None,
+ sql,
+ from_key,
+ to_key,
+ )
+ return {u for u, in rows}
+
+ @cancellable
async def get_users_whose_devices_changed(
self,
from_key: int,
- user_ids: Optional[Collection[str]] = None,
+ user_ids: Collection[str],
to_key: Optional[int] = None,
) -> Set[str]:
"""Get set of users whose devices have changed since `from_key` that
@@ -830,46 +881,31 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"""
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
- user_ids_to_check: Optional[Collection[str]]
- if user_ids is None:
- # Get set of all users that have had device list changes since 'from_key'
- user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
- from_key
- )
- else:
- # The same as above, but filter results to only those users in 'user_ids'
- user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
- user_ids, from_key
- )
+ user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
+ user_ids, from_key
+ )
+ # If an empty set was returned, there's nothing to do.
if not user_ids_to_check:
return set()
- def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
- changes: Set[str] = set()
-
- stream_id_where_clause = "stream_id > ?"
- sql_args = [from_key]
-
- if to_key:
- stream_id_where_clause += " AND stream_id <= ?"
- sql_args.append(to_key)
+ if to_key is None:
+ to_key = self._device_list_id_gen.get_current_token()
- sql = f"""
+ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
+ sql = """
SELECT DISTINCT user_id FROM device_lists_stream
- WHERE {stream_id_where_clause}
- AND
+ WHERE ? < stream_id AND stream_id <= ? AND %s
"""
+ changes: Set[str] = set()
+
# Query device changes with a batch of users at a time
- # Assertion for mypy's benefit; see also
- # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
- assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
- txn.execute(sql + clause, sql_args + args)
+ txn.execute(sql % (clause,), [from_key, to_key] + args)
changes.update(user_id for user_id, in txn)
return changes
@@ -1744,9 +1780,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"content": json_encoder.encode(content)},
- # we don't need to lock, because we assume we are the only thread
- # updating this user's devices.
- lock=False,
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
@@ -1760,9 +1793,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
- # again, we can assume we are the only thread updating this user's
- # extremity.
- lock=False,
)
async def update_remote_device_list_cache(
@@ -1815,9 +1845,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
- # we don't need to lock, because we can assume we are the only thread
- # updating this user's extremity.
- lock=False,
)
async def add_device_change_to_streams(
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index cf33e73e2b..4c691642e2 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -33,7 +33,7 @@ from typing_extensions import Literal
from synapse.api.constants import DeviceKeyAlgorithms
from synapse.appservice import (
- TransactionOneTimeKeyCounts,
+ TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
@@ -140,7 +140,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
@cancellable
async def get_e2e_device_keys_for_cs_api(
self,
- query_list: List[Tuple[str, Optional[str]]],
+ query_list: Collection[Tuple[str, Optional[str]]],
include_displaynames: bool = True,
) -> Dict[str, Dict[str, JsonDict]]:
"""Fetch a list of device keys, formatted suitably for the C/S API.
@@ -514,7 +514,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
async def count_bulk_e2e_one_time_keys_for_as(
self, user_ids: Collection[str]
- ) -> TransactionOneTimeKeyCounts:
+ ) -> TransactionOneTimeKeysCount:
"""
Counts, in bulk, the one-time keys for all the users specified.
Intended to be used by application services for populating OTK counts in
@@ -528,7 +528,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
def _count_bulk_e2e_one_time_keys_txn(
txn: LoggingTransaction,
- ) -> TransactionOneTimeKeyCounts:
+ ) -> TransactionOneTimeKeysCount:
user_in_where_clause, user_parameters = make_in_list_sql_clause(
self.database_engine, "user_id", user_ids
)
@@ -541,7 +541,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"""
txn.execute(sql, user_parameters)
- result: TransactionOneTimeKeyCounts = {}
+ result: TransactionOneTimeKeysCount = {}
for user_id, device_id, algorithm, count in txn:
# We deliberately construct empty dictionaries for
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 309a4ba664..bbee02ab18 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1686,7 +1686,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
},
insertion_values={},
desc="insert_insertion_extremity",
- lock=False,
)
async def insert_received_event_to_staging(
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b283ab0f9c..7ebe34f773 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,6 +74,7 @@ receipt.
"""
import logging
+from collections import defaultdict
from typing import (
TYPE_CHECKING,
Collection,
@@ -95,6 +96,7 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ PostgresEngine,
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return result
+ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
+ """Get the notification count by room for a user. Only considers notifications,
+ not highlight or unread counts, and threads are currently aggregated under their room.
+
+ This function is intentionally not cached because it is called to calculate the
+ unread badge for push notifications and thus the result is expected to change.
+
+ Note that this function assumes the user is a member of the room. Because
+ summary rows are not removed when a user leaves a room, the caller must
+ filter out those results from the result.
+
+ Returns:
+ A map of room ID to notification counts for the given user.
+ """
+ return await self.db_pool.runInteraction(
+ "get_unread_counts_by_room_for_user",
+ self._get_unread_counts_by_room_for_user_txn,
+ user_id,
+ )
+
+ def _get_unread_counts_by_room_for_user_txn(
+ self, txn: LoggingTransaction, user_id: str
+ ) -> Dict[str, int]:
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+ )
+ args.extend([user_id, user_id])
+
+ receipts_cte = f"""
+ WITH all_receipts AS (
+ SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
+ FROM receipts_linearized
+ LEFT JOIN events USING (room_id, event_id)
+ WHERE
+ {receipt_types_clause}
+ AND user_id = ?
+ GROUP BY room_id, thread_id
+ )
+ """
+
+ receipts_joins = """
+ LEFT JOIN (
+ SELECT room_id, thread_id,
+ max_receipt_stream_ordering AS threaded_receipt_stream_ordering
+ FROM all_receipts
+ WHERE thread_id IS NOT NULL
+ ) AS threaded_receipts USING (room_id, thread_id)
+ LEFT JOIN (
+ SELECT room_id, thread_id,
+ max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
+ FROM all_receipts
+ WHERE thread_id IS NULL
+ ) AS unthreaded_receipts USING (room_id)
+ """
+
+ # First get summary counts by room / thread for the user. We use the max receipt
+ # stream ordering of both threaded & unthreaded receipts to compare against the
+ # summary table.
+ #
+ # PostgreSQL and SQLite differ in comparing scalar numerics.
+ if isinstance(self.database_engine, PostgresEngine):
+ # GREATEST ignores NULLs.
+ max_clause = """GREATEST(
+ threaded_receipt_stream_ordering,
+ unthreaded_receipt_stream_ordering
+ )"""
+ else:
+ # MAX returns NULL if any are NULL, so COALESCE to 0 first.
+ max_clause = """MAX(
+ COALESCE(threaded_receipt_stream_ordering, 0),
+ COALESCE(unthreaded_receipt_stream_ordering, 0)
+ )"""
+
+ sql = f"""
+ {receipts_cte}
+ SELECT eps.room_id, eps.thread_id, notif_count
+ FROM event_push_summary AS eps
+ {receipts_joins}
+ WHERE user_id = ?
+ AND notif_count != 0
+ AND (
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
+ OR last_receipt_stream_ordering = {max_clause}
+ )
+ """
+ txn.execute(sql, args)
+
+ seen_thread_ids = set()
+ room_to_count: Dict[str, int] = defaultdict(int)
+
+ for room_id, thread_id, notif_count in txn:
+ room_to_count[room_id] += notif_count
+ seen_thread_ids.add(thread_id)
+
+ # Now get any event push actions that haven't been rotated using the same OR
+ # join and filter by receipt and event push summary rotated up to stream ordering.
+ sql = f"""
+ {receipts_cte}
+ SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+ FROM event_push_actions AS epa
+ {receipts_joins}
+ WHERE user_id = ?
+ AND epa.notif = 1
+ AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+ GROUP BY epa.room_id, epa.thread_id
+ """
+ txn.execute(sql, args)
+
+ for room_id, thread_id, notif_count in txn:
+ # Note: only count push actions we have valid summaries for with up to date receipt.
+ if thread_id not in seen_thread_ids:
+ continue
+ room_to_count[room_id] += notif_count
+
+ thread_id_clause, thread_ids_args = make_in_list_sql_clause(
+ self.database_engine, "epa.thread_id", seen_thread_ids
+ )
+
+ # Finally re-check event_push_actions for any rooms not in the summary, ignoring
+ # the rotated up-to position. This handles the case where a read receipt has arrived
+ # but not been rotated meaning the summary table is out of date, so we go back to
+ # the push actions table.
+ sql = f"""
+ {receipts_cte}
+ SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+ FROM event_push_actions AS epa
+ {receipts_joins}
+ WHERE user_id = ?
+ AND NOT {thread_id_clause}
+ AND epa.notif = 1
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+ GROUP BY epa.room_id
+ """
+
+ args.extend(thread_ids_args)
+ txn.execute(sql, args)
+
+ for room_id, notif_count in txn:
+ room_to_count[room_id] += notif_count
+
+ return room_to_count
+
@cached(tree=True, max_entries=5000, iterable=True)
async def get_unread_event_push_actions_by_room_for_user(
self,
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 12ad44dbb3..d4c64c46ad 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -84,7 +84,10 @@ def _load_rules(
push_rules = PushRules(ruleslist)
filtered_rules = FilteredPushRules(
- push_rules, enabled_map, msc3664_enabled=experimental_config.msc3664_enabled
+ push_rules,
+ enabled_map,
+ msc3664_enabled=experimental_config.msc3664_enabled,
+ msc1767_enabled=experimental_config.msc1767_enabled,
)
return filtered_rules
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index fee37b9ce4..40fd781a6a 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -325,14 +325,11 @@ class PusherWorkerStore(SQLBaseStore):
async def set_throttle_params(
self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None:
- # no need to lock because `pusher_throttle` has a primary key on
- # (pusher, room_id) so simple_upsert will retry
await self.db_pool.simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params",
- lock=False,
)
async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
@@ -589,8 +586,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
device_id: Optional[str] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
- # no need to lock because `pushers` has a unique key on
- # (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
@@ -609,7 +604,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"device_id": device_id,
},
desc="add_pusher",
- lock=False,
)
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index a580e4bdda..e06725f69c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -924,39 +924,6 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
return batch_size
- async def _create_receipts_index(self, index_name: str, table: str) -> None:
- """Adds a unique index on `(room_id, receipt_type, user_id)` to the given
- receipts table, for non-thread receipts."""
-
- def _create_index(conn: LoggingDatabaseConnection) -> None:
- conn.rollback()
-
- # we have to set autocommit, because postgres refuses to
- # CREATE INDEX CONCURRENTLY without it.
- if isinstance(self.database_engine, PostgresEngine):
- conn.set_session(autocommit=True)
-
- try:
- c = conn.cursor()
-
- # Now that the duplicates are gone, we can create the index.
- concurrently = (
- "CONCURRENTLY"
- if isinstance(self.database_engine, PostgresEngine)
- else ""
- )
- sql = f"""
- CREATE UNIQUE INDEX {concurrently} {index_name}
- ON {table}(room_id, receipt_type, user_id)
- WHERE thread_id IS NULL
- """
- c.execute(sql)
- finally:
- if isinstance(self.database_engine, PostgresEngine):
- conn.set_session(autocommit=False)
-
- await self.db_pool.runWithConnection(_create_index)
-
async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
@@ -999,9 +966,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
- await self._create_receipts_index(
- "receipts_linearized_unique_index",
- "receipts_linearized",
+ await self.db_pool.updates.create_index_in_background(
+ index_name="receipts_linearized_unique_index",
+ table="receipts_linearized",
+ columns=["room_id", "receipt_type", "user_id"],
+ where_clause="thread_id IS NULL",
+ unique=True,
)
await self.db_pool.updates._end_background_update(
@@ -1050,9 +1020,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
- await self._create_receipts_index(
- "receipts_graph_unique_index",
- "receipts_graph",
+ await self.db_pool.updates.create_index_in_background(
+ index_name="receipts_graph_unique_index",
+ table="receipts_graph",
+ columns=["room_id", "receipt_type", "user_id"],
+ where_clause="thread_id IS NULL",
+ unique=True,
)
await self.db_pool.updates._end_background_update(
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 52ad947c6c..78906a5e1d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import IdGenerator
+from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
+ IdGenerator,
+ MultiWriterIdGenerator,
+ StreamIdGenerator,
+)
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config
+ self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_room_stream",
+ instance_name=self._instance_name,
+ tables=[
+ ("un_partial_stated_room_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_room_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_room_stream", "stream_id"
+ )
+
async def store_room(
self,
room_id: str,
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
- async def clear_partial_state_room(self, room_id: str) -> bool:
- """Clears the partial state flag for a room.
-
- Args:
- room_id: The room whose partial state flag is to be cleared.
-
- Returns:
- `True` if the partial state flag has been cleared successfully.
-
- `False` if the partial state flag could not be cleared because the room
- still contains events with partial state.
- """
- try:
- await self.db_pool.runInteraction(
- "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
- )
- return True
- except self.db_pool.engine.module.IntegrityError as e:
- # Assume that any `IntegrityError`s are due to partial state events.
- logger.info(
- "Exception while clearing lazy partial-state-room %s, retrying: %s",
- room_id,
- e,
- )
- return False
-
- def _clear_partial_state_room_txn(
- self, txn: LoggingTransaction, room_id: str
- ) -> None:
- DatabasePool.simple_delete_txn(
- txn,
- table="partial_state_rooms_servers",
- keyvalues={"room_id": room_id},
- )
- DatabasePool.simple_delete_one_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={"room_id": room_id},
- )
- self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
- self._invalidate_cache_and_stream(
- txn, self.get_partial_state_servers_at_join, (room_id,)
- )
-
- # We now delete anything from `device_lists_remote_pending` with a
- # stream ID less than the minimum
- # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
- device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={},
- retcol="MIN(device_lists_stream_id)",
- allow_none=True,
- )
- if device_lists_stream_id is None:
- # There are no rooms being currently partially joined, so we delete everything.
- txn.execute("DELETE FROM device_lists_remote_pending")
- else:
- sql = """
- DELETE FROM device_lists_remote_pending
- WHERE stream_id <= ?
- """
- txn.execute(sql, (device_lists_stream_id,))
-
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return result["join_event_id"], result["device_lists_stream_id"]
+ def get_un_partial_stated_rooms_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there
+ # are multiple writers because workers that don't write often will
+ # hold all readers up.
+ # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
+ # explanation.)
+ return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_rooms_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ """Get updates for caches replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_rooms_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ sql = """
+ SELECT stream_id, room_id
+ FROM un_partial_stated_room_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [(row[0], (row[1],)) for row in txn]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_rooms_from_stream",
+ get_un_partial_stated_rooms_from_stream_txn,
+ )
+
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
+ self._instance_name = hs.get_instance_name()
+
async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
) -> None:
@@ -1847,9 +1871,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
- # rooms has a unique constraint on room_id, so no need to lock when doing an
- # emulated upsert.
- lock=False,
)
async def store_partial_state_room(
@@ -1970,9 +1991,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
- # rooms has a unique constraint on room_id, so no need to lock when doing an
- # emulated upsert.
- lock=False,
)
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
@@ -2276,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self.is_room_blocked,
(room_id,),
)
+
+ async def clear_partial_state_room(self, room_id: str) -> bool:
+ """Clears the partial state flag for a room.
+
+ Args:
+ room_id: The room whose partial state flag is to be cleared.
+
+ Returns:
+ `True` if the partial state flag has been cleared successfully.
+
+ `False` if the partial state flag could not be cleared because the room
+ still contains events with partial state.
+ """
+ try:
+ async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+ await self.db_pool.runInteraction(
+ "clear_partial_state_room",
+ self._clear_partial_state_room_txn,
+ room_id,
+ un_partial_state_room_stream_id,
+ )
+ return True
+ except self.db_pool.engine.module.IntegrityError as e:
+ # Assume that any `IntegrityError`s are due to partial state events.
+ logger.info(
+ "Exception while clearing lazy partial-state-room %s, retrying: %s",
+ room_id,
+ e,
+ )
+ return False
+
+ def _clear_partial_state_room_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ un_partial_state_room_stream_id: int,
+ ) -> None:
+ DatabasePool.simple_delete_txn(
+ txn,
+ table="partial_state_rooms_servers",
+ keyvalues={"room_id": room_id},
+ )
+ DatabasePool.simple_delete_one_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={"room_id": room_id},
+ )
+ self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_partial_state_servers_at_join, (room_id,)
+ )
+
+ DatabasePool.simple_insert_txn(
+ txn,
+ "un_partial_stated_room_stream",
+ {
+ "stream_id": un_partial_state_room_stream_id,
+ "instance_name": self._instance_name,
+ "room_id": room_id,
+ },
+ )
+
+ # We now delete anything from `device_lists_remote_pending` with a
+ # stream ID less than the minimum
+ # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+ device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={},
+ retcol="MIN(device_lists_stream_id)",
+ allow_none=True,
+ )
+ if device_lists_stream_id is None:
+ # There are no rooms being currently partially joined, so we delete everything.
+ txn.execute("DELETE FROM device_lists_remote_pending")
+ else:
+ sql = """
+ DELETE FROM device_lists_remote_pending
+ WHERE stream_id <= ?
+ """
+ txn.execute(sql, (device_lists_stream_id,))
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 39e80f6f5b..131f357d04 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -44,6 +44,4 @@ class RoomBatchStore(SQLBaseStore):
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
- # Unique constraint on event_id so we don't have to lock
- lock=False,
)
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index af7bebee80..c801a93b5b 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -33,8 +33,8 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
-from synapse.storage.state import StateFilter
from synapse.types import JsonDict, JsonMapping, StateMap
+from synapse.types.state import StateFilter
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.cancellation import cancellable
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 698d6f7515..14ef5b040d 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -26,6 +26,14 @@ from typing import (
cast,
)
+try:
+ # Figure out if ICU support is available for searching users.
+ import icu
+
+ USE_ICU = True
+except ModuleNotFoundError:
+ USE_ICU = False
+
from typing_extensions import TypedDict
from synapse.api.errors import StoreError
@@ -481,7 +489,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
- lock=False, # We're only inserter
)
if isinstance(self.database_engine, PostgresEngine):
@@ -511,7 +518,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
- lock=False, # We're only inserter
)
else:
# This should be unreachable.
@@ -888,7 +894,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
limited = len(results) > limit
- return {"limited": limited, "results": results}
+ return {"limited": limited, "results": results[0:limit]}
def _parse_query_sqlite(search_term: str) -> str:
@@ -902,7 +908,7 @@ def _parse_query_sqlite(search_term: str) -> str:
"""
# Pull out the individual words, discarding any non-word characters.
- results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+ results = _parse_words(search_term)
return " & ".join("(%s* OR %s)" % (result, result) for result in results)
@@ -912,12 +918,63 @@ def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
We use this so that we can add prefix matching, which isn't something
that is supported by default.
"""
-
- # Pull out the individual words, discarding any non-word characters.
- results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+ results = _parse_words(search_term)
both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
exact = " & ".join("%s" % (result,) for result in results)
prefix = " & ".join("%s:*" % (result,) for result in results)
return both, exact, prefix
+
+
+def _parse_words(search_term: str) -> List[str]:
+ """Split the provided search string into a list of its words.
+
+ If support for ICU (International Components for Unicode) is available, use it.
+ Otherwise, fall back to using a regex to detect word boundaries. This latter
+ solution works well enough for most latin-based languages, but doesn't work as well
+ with other languages.
+
+ Args:
+ search_term: The search string.
+
+ Returns:
+ A list of the words in the search string.
+ """
+ if USE_ICU:
+ return _parse_words_with_icu(search_term)
+
+ return re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+
+
+def _parse_words_with_icu(search_term: str) -> List[str]:
+ """Break down the provided search string into its individual words using ICU
+ (International Components for Unicode).
+
+ Args:
+ search_term: The search string.
+
+ Returns:
+ A list of the words in the search string.
+ """
+ results = []
+ breaker = icu.BreakIterator.createWordInstance(icu.Locale.getDefault())
+ breaker.setText(search_term)
+ i = 0
+ while True:
+ j = breaker.nextBoundary()
+ if j < 0:
+ break
+
+ result = search_term[i:j]
+
+ # libicu considers spaces and punctuation between words as words, but we don't
+ # want to include those in results as they would result in syntax errors in SQL
+ # queries (e.g. "foo bar" would result in the search query including "foo & &
+ # bar").
+ if len(re.findall(r"([\w\-]+)", result, re.UNICODE)):
+ results.append(result)
+
+ i = j
+
+ return results
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 4a4ad0f492..d743282f13 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -22,8 +22,8 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
-from synapse.storage.state import StateFilter
from synapse.types import MutableStateMap, StateMap
+from synapse.types.state import StateFilter
from synapse.util.caches import intern_string
if TYPE_CHECKING:
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index f8cfcaca83..1a7232b276 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -25,10 +25,10 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
-from synapse.storage.state import StateFilter
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import MutableStateMap, StateKey, StateMap
+from synapse.types.state import StateFilter
from synapse.util.caches.descriptors import cached
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.cancellation import cancellable
diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
new file mode 100644
index 0000000000..743196cfe3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that a room has become un-partial-stated.
+CREATE TABLE un_partial_stated_room_stream(
+ -- Position in the stream
+ stream_id BIGINT PRIMARY KEY NOT NULL,
+
+ -- Which instance wrote this entry.
+ instance_name TEXT NOT NULL,
+
+ -- Which room has been un-partial-stated.
+ room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting a room, the database needs to be able to check here.
+-- This index is not unique because we can join a room multiple times in a server's lifetime,
+-- so the same room could be un-partial-stated multiple times!
+CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..c1aac0b385
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence;
+
+SELECT setval('un_partial_stated_room_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream
+));
diff --git a/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql b/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql
new file mode 100644
index 0000000000..afab1e4bb7
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql
@@ -0,0 +1,29 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+ -- Set up user directory staging tables.
+ (7322, 'populate_user_directory_createtables', '{}', NULL),
+ -- Run through each room and update the user directory according to who is in it.
+ (7322, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'),
+ -- Insert all users into the user directory, if search_all_users is on.
+ (7322, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'),
+ -- Clean up user directory staging tables.
+ (7322, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'),
+ -- Rebuild the room_stats_current and room_stats_state tables.
+ (7322, 'populate_stats_process_rooms', '{}', NULL),
+ -- Update the user_stats_current table.
+ (7322, 'populate_stats_process_users', '{}', NULL)
+ON CONFLICT (update_name) DO NOTHING;
diff --git a/synapse/types.py b/synapse/types/__init__.py
index f2d436ddc3..f2d436ddc3 100644
--- a/synapse/types.py
+++ b/synapse/types/__init__.py
diff --git a/synapse/storage/state.py b/synapse/types/state.py
index 0004d955b4..0004d955b4 100644
--- a/synapse/storage/state.py
+++ b/synapse/types/state.py
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 666f4b6895..1657459549 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -16,6 +16,7 @@ import logging
import math
from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union
+import attr
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -26,14 +27,41 @@ logger = logging.getLogger(__name__)
EntityType = str
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class AllEntitiesChangedResult:
+ """Return type of `get_all_entities_changed`.
+
+ Callers must check that there was a cache hit, via `result.hit`, before
+ using the entities in `result.entities`.
+
+ This specifically does *not* implement helpers such as `__bool__` to ensure
+ that callers do the correct checks.
+ """
+
+ _entities: Optional[List[EntityType]]
+
+ @property
+ def hit(self) -> bool:
+ return self._entities is not None
+
+ @property
+ def entities(self) -> List[EntityType]:
+ assert self._entities is not None
+ return self._entities
+
+
class StreamChangeCache:
- """Keeps track of the stream positions of the latest change in a set of entities.
+ """
+ Keeps track of the stream positions of the latest change in a set of entities.
+
+ The entity will is typically a room ID or user ID, but can be any string.
- Typically the entity will be a room or user id.
+ Can be queried for whether a specific entity has changed after a stream position
+ or for a list of changed entities after a stream position. See the individual
+ methods for more information.
- Given a list of entities and a stream position, it will give a subset of
- entities that may have changed since that position. If position key is too
- old then the cache will simply return all given entities.
+ Only tracks to a maximum cache size, any position earlier than the earliest
+ known stream position must be treated as unknown.
"""
def __init__(
@@ -45,16 +73,20 @@ class StreamChangeCache:
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
- self._entity_to_key: Dict[EntityType, int] = {}
- # map from stream id to the a set of entities which changed at that stream id.
+ # map from stream id to the set of entities which changed at that stream id.
self._cache: SortedDict[int, Set[EntityType]] = SortedDict()
+ # map from entity to the stream ID of the latest change for that entity.
+ #
+ # Must be kept in sync with _cache.
+ self._entity_to_key: Dict[EntityType, int] = {}
# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.
#
self._earliest_known_stream_pos = current_stream_pos
+
self.name = name
self.metrics = caches.register_cache(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
@@ -82,22 +114,46 @@ class StreamChangeCache:
return False
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
- """Returns True if the entity may have been updated since stream_pos"""
+ """
+ Returns True if the entity may have been updated after stream_pos.
+
+ Args:
+ entity: The entity to check for changes.
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ True if the entity may have been updated, this happens if:
+ * The given stream position is at or earlier than the earliest
+ known stream position.
+ * The given stream position is earlier than the latest change for
+ the entity.
+
+ False otherwise:
+ * The entity is unknown.
+ * The given stream position is at or later than the latest change
+ for the entity.
+ """
assert isinstance(stream_pos, int)
- if stream_pos < self._earliest_known_stream_pos:
+ # _cache is not valid at or before the earliest known stream position, so
+ # return that the entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
+ # If the entity is unknown, it hasn't changed.
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
self.metrics.inc_hits()
return False
+ # This is a known entity, return true if the stream position is earlier
+ # than the last change.
if stream_pos < latest_entity_change_pos:
self.metrics.inc_misses()
return True
+ # Otherwise, the stream position is after the latest change: return false.
self.metrics.inc_hits()
return False
@@ -105,23 +161,35 @@ class StreamChangeCache:
self, entities: Collection[EntityType], stream_pos: int
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
- Returns subset of entities that have had new things since the given
- position. Entities unknown to the cache will be returned. If the
- position is too old it will just return the given list.
+ Returns the subset of the given entities that have had changes after the given position.
+
+ Entities unknown to the cache will be returned.
+
+ If the position is too old it will just return the given list.
+
+ Args:
+ entities: Entities to check for changes.
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ A subset of entities which have changed after the given stream position.
+
+ This will be all entities if the given stream position is at or earlier
+ than the earliest known stream position.
"""
- changed_entities = self.get_all_entities_changed(stream_pos)
- if changed_entities is not None:
+ cache_result = self.get_all_entities_changed(stream_pos)
+ if cache_result.hit:
# We now do an intersection, trying to do so in the most efficient
# way possible (some of these sets are *large*). First check in the
- # given iterable is already set that we can reuse, otherwise we
+ # given iterable is already a set that we can reuse, otherwise we
# create a set of the *smallest* of the two iterables and call
# `intersection(..)` on it (this can be twice as fast as the reverse).
if isinstance(entities, (set, frozenset)):
- result = entities.intersection(changed_entities)
- elif len(changed_entities) < len(entities):
- result = set(changed_entities).intersection(entities)
+ result = entities.intersection(cache_result.entities)
+ elif len(cache_result.entities) < len(entities):
+ result = set(cache_result.entities).intersection(entities)
else:
- result = set(entities).intersection(changed_entities)
+ result = set(entities).intersection(cache_result.entities)
self.metrics.inc_hits()
else:
result = set(entities)
@@ -130,43 +198,76 @@ class StreamChangeCache:
return result
def has_any_entity_changed(self, stream_pos: int) -> bool:
- """Returns if any entity has changed"""
- assert type(stream_pos) is int
+ """
+ Returns true if any entity has changed after the given stream position.
- if not self._cache:
- # If the cache is empty, nothing can have changed.
- return False
+ Args:
+ stream_pos: The stream position to check for changes after.
- if stream_pos >= self._earliest_known_stream_pos:
- self.metrics.inc_hits()
- return self._cache.bisect_right(stream_pos) < len(self._cache)
- else:
+ Return:
+ True if any entity has changed after the given stream position or
+ if the given stream position is at or earlier than the earliest
+ known stream position.
+
+ False otherwise.
+ """
+ assert isinstance(stream_pos, int)
+
+ # _cache is not valid at or before the earliest known stream position, so
+ # return that an entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
- def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
- """Returns all entities that have had new things since the given
- position. If the position is too old it will return None.
+ # If the cache is empty, nothing can have changed.
+ if not self._cache:
+ self.metrics.inc_misses()
+ return False
+
+ self.metrics.inc_hits()
+ return stream_pos < self._cache.peekitem()[0]
+
+ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
+ """
+ Returns all entities that have had changes after the given position.
+
+ If the stream change cache does not go far enough back, i.e. the
+ position is too old, it will return None.
Returns the entities in the order that they were changed.
+
+ Args:
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ A class indicating if we have the requested data cached, and if so
+ includes the entities in the order they were changed.
"""
- assert type(stream_pos) is int
+ assert isinstance(stream_pos, int)
- if stream_pos < self._earliest_known_stream_pos:
- return None
+ # _cache is not valid at or before the earliest known stream position, so
+ # return None to mark that it is unknown if an entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
+ return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
- return changed_entities
+ return AllEntitiesChangedResult(changed_entities)
def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
- """Informs the cache that the entity has been changed at the given
- position.
"""
- assert type(stream_pos) is int
+ Informs the cache that the entity has been changed at the given position.
+ Args:
+ entity: The entity to mark as changed.
+ stream_pos: The stream position to update the entity to.
+ """
+ assert isinstance(stream_pos, int)
+
+ # For a change before _cache is valid (e.g. at or before the earliest known
+ # stream position) there's nothing to do.
if stream_pos <= self._earliest_known_stream_pos:
return
@@ -189,6 +290,11 @@ class StreamChangeCache:
self._evict()
def _evict(self) -> None:
+ """
+ Ensure the cache has not exceeded the maximum size.
+
+ Evicts entries until it is at the maximum size.
+ """
# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
@@ -199,5 +305,12 @@ class StreamChangeCache:
def get_max_pos_of_last_change(self, entity: EntityType) -> int:
"""Returns an upper bound of the stream id of the last change to an
entity.
+
+ Args:
+ entity: The entity to check.
+
+ Return:
+ The stream position of the latest change for the given entity or
+ the earliest known stream position if the entitiy is unknown.
"""
return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index a0606851f7..39fab4fe06 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -15,7 +15,9 @@
import logging
from typing import Dict
-from twisted.web.resource import NoResource, Resource
+from twisted.web.resource import Resource
+
+from synapse.http.server import UnrecognizedRequestResource
logger = logging.getLogger(__name__)
@@ -49,7 +51,7 @@ def create_resource_tree(
for path_seg in full_path.split(b"/")[1:-1]:
if path_seg not in last_resource.listNames():
# resource doesn't exist, so make a "dummy resource"
- child_resource: Resource = NoResource()
+ child_resource: Resource = UnrecognizedRequestResource()
last_resource.putChild(path_seg, child_resource)
res_id = _resource_id(last_resource, path_seg)
resource_mappings[res_id] = child_resource
diff --git a/synapse/visibility.py b/synapse/visibility.py
index b443857571..e442de3173 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -26,8 +26,8 @@ from synapse.events.utils import prune_event
from synapse.logging.opentracing import trace
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
-from synapse.storage.state import StateFilter
from synapse.types import RetentionPolicy, StateMap, get_domain_from_id
+from synapse.types.state import StateFilter
from synapse.util import Clock
logger = logging.getLogger(__name__)
|