summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2022-12-12 17:48:27 +0100
committerMathieu Velten <mathieuv@matrix.org>2022-12-12 17:48:27 +0100
commit4ccade636e9903c78eeba89d8487a3b3e2ba4e2f (patch)
tree8a8e732fceda8923054506689833eb447067e711 /synapse
parentMerge remote-tracking branch 'origin/develop' into mv/unbind-callback (diff)
parentMove `StateFilter` to `synapse.types` (#14668) (diff)
downloadsynapse-4ccade636e9903c78eeba89d8487a3b3e2ba4e2f.tar.xz
Merge remote-tracking branch 'origin/develop' into mv/unbind-callback
Diffstat (limited to 'synapse')
-rw-r--r--synapse/_scripts/register_new_matrix_user.py5
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py6
-rw-r--r--synapse/api/room_versions.py48
-rw-r--r--synapse/app/generic_worker.py74
-rw-r--r--synapse/appservice/__init__.py10
-rw-r--r--synapse/appservice/api.py11
-rw-r--r--synapse/appservice/scheduler.py16
-rw-r--r--synapse/config/experimental.py11
-rw-r--r--synapse/config/push.py1
-rw-r--r--synapse/crypto/keyring.py107
-rw-r--r--synapse/events/builder.py2
-rw-r--r--synapse/events/snapshot.py2
-rw-r--r--synapse/federation/federation_client.py41
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/per_destination_queue.py185
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/federation/transport/server/__init__.py8
-rw-r--r--synapse/federation/transport/server/federation.py3
-rw-r--r--synapse/handlers/appservice.py7
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/devicemessage.py36
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/handlers/message.py8
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/receipts.py1
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_member.py2
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/sync.py40
-rw-r--r--synapse/handlers/typing.py8
-rw-r--r--synapse/http/server.py19
-rw-r--r--synapse/logging/opentracing.py11
-rw-r--r--synapse/metrics/common_usage_metrics.py4
-rw-r--r--synapse/module_api/__init__.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py40
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/push/push_tools.py28
-rw-r--r--synapse/replication/tcp/client.py11
-rw-r--r--synapse/replication/tcp/streams/__init__.py3
-rw-r--r--synapse/replication/tcp/streams/partial_state.py48
-rw-r--r--synapse/res/templates/_base.html6
-rw-r--r--synapse/res/templates/notice_expiry.html6
-rw-r--r--synapse/res/templates/notif_mail.html6
-rw-r--r--synapse/rest/__init__.py59
-rw-r--r--synapse/rest/admin/rooms.py2
-rw-r--r--synapse/rest/client/account.py26
-rw-r--r--synapse/rest/client/devices.py10
-rw-r--r--synapse/rest/client/keys.py5
-rw-r--r--synapse/rest/client/receipts.py5
-rw-r--r--synapse/rest/client/register.py9
-rw-r--r--synapse/rest/client/room.py32
-rw-r--r--synapse/rest/client/sendtodevice.py1
-rw-r--r--synapse/rest/client/user_directory.py4
-rw-r--r--synapse/rest/client/versions.py3
-rw-r--r--synapse/rest/media/v1/media_repository.py4
-rw-r--r--synapse/state/__init__.py2
-rw-r--r--synapse/storage/background_updates.py55
-rw-r--r--synapse/storage/controllers/persist_events.py2
-rw-r--r--synapse/storage/controllers/state.py2
-rw-r--r--synapse/storage/database.py56
-rw-r--r--synapse/storage/databases/main/account_data.py8
-rw-r--r--synapse/storage/databases/main/appservice.py12
-rw-r--r--synapse/storage/databases/main/deviceinbox.py92
-rw-r--r--synapse/storage/databases/main/devices.py105
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py10
-rw-r--r--synapse/storage/databases/main/event_federation.py1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py149
-rw-r--r--synapse/storage/databases/main/push_rule.py5
-rw-r--r--synapse/storage/databases/main/pusher.py6
-rw-r--r--synapse/storage/databases/main/receipts.py51
-rw-r--r--synapse/storage/databases/main/room.py243
-rw-r--r--synapse/storage/databases/main/room_batch.py2
-rw-r--r--synapse/storage/databases/main/state.py2
-rw-r--r--synapse/storage/databases/main/user_directory.py71
-rw-r--r--synapse/storage/databases/state/bg_updates.py2
-rw-r--r--synapse/storage/databases/state/store.py2
-rw-r--r--synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql32
-rw-r--r--synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres20
-rw-r--r--synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql29
-rw-r--r--synapse/types/__init__.py (renamed from synapse/types.py)0
-rw-r--r--synapse/types/state.py (renamed from synapse/storage/state.py)0
-rw-r--r--synapse/util/caches/stream_change_cache.py189
-rw-r--r--synapse/util/httpresourcetree.py6
-rw-r--r--synapse/visibility.py2
88 files changed, 1460 insertions, 718 deletions
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py

index 0c4504d5d8..2b74a40166 100644 --- a/synapse/_scripts/register_new_matrix_user.py +++ b/synapse/_scripts/register_new_matrix_user.py
@@ -222,6 +222,7 @@ def main() -> None: args = parser.parse_args() + config: Optional[Dict[str, Any]] = None if "config" in args and args.config: config = yaml.safe_load(args.config) @@ -229,7 +230,7 @@ def main() -> None: secret = args.shared_secret else: # argparse should check that we have either config or shared secret - assert config + assert config is not None secret = config.get("registration_shared_secret") secret_file = config.get("registration_shared_secret_path") @@ -244,7 +245,7 @@ def main() -> None: if args.server_url: server_url = args.server_url - elif config: + elif config is not None: server_url = _find_client_listener(config) if not server_url: server_url = _DEFAULT_SERVER_URL diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index bc04a0755b..89723d24fa 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -230,6 +230,9 @@ class EventContentFields: # The authorising user for joining a restricted room. AUTHORISING_USER: Final = "join_authorised_via_users_server" + # an unspecced field added to to-device messages to identify them uniquely-ish + TO_DEVICE_MSGID: Final = "org.matrix.msgid" + class RoomTypes: """Understood values of the room_type field of m.room.create events.""" diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e2cfcea0f2..76ef12ed3a 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py
@@ -300,10 +300,8 @@ class InteractiveAuthIncompleteError(Exception): class UnrecognizedRequestError(SynapseError): """An error indicating we don't understand the request you're trying to make""" - def __init__( - self, msg: str = "Unrecognized request", errcode: str = Codes.UNRECOGNIZED - ): - super().__init__(400, msg, errcode) + def __init__(self, msg: str = "Unrecognized request", code: int = 400): + super().__init__(code, msg, Codes.UNRECOGNIZED) class NotFoundError(SynapseError): diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index e37acb0f1e..ac62011c9f 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Dict, Optional +from typing import Callable, Dict, List, Optional import attr @@ -51,6 +51,13 @@ class RoomDisposition: UNSTABLE = "unstable" +class PushRuleRoomFlag: + """Enum for listing possible MSC3931 room version feature flags, for push rules""" + + # MSC3932: Room version supports MSC1767 Extensible Events. + EXTENSIBLE_EVENTS = "org.matrix.msc3932.extensible_events" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class RoomVersion: """An object which describes the unique attributes of a room version.""" @@ -91,6 +98,12 @@ class RoomVersion: msc3787_knock_restricted_join_rule: bool # MSC3667: Enforce integer power levels msc3667_int_only_power_levels: bool + # MSC3931: Adds a push rule condition for "room version feature flags", making + # some push rules room version dependent. Note that adding a flag to this list + # is not enough to mark it "supported": the push rule evaluator also needs to + # support the flag. Unknown flags are ignored by the evaluator, making conditions + # fail if used. + msc3931_push_features: List[str] # values from PushRuleRoomFlag class RoomVersions: @@ -111,6 +124,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V2 = RoomVersion( "2", @@ -129,6 +143,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V3 = RoomVersion( "3", @@ -147,6 +162,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V4 = RoomVersion( "4", @@ -165,6 +181,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V5 = RoomVersion( "5", @@ -183,6 +200,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V6 = RoomVersion( "6", @@ -201,6 +219,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) MSC2176 = RoomVersion( "org.matrix.msc2176", @@ -219,6 +238,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V7 = RoomVersion( "7", @@ -237,6 +257,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V8 = RoomVersion( "8", @@ -255,6 +276,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V9 = RoomVersion( "9", @@ -273,6 +295,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) MSC3787 = RoomVersion( "org.matrix.msc3787", @@ -291,6 +314,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=False, + msc3931_push_features=[], ) V10 = RoomVersion( "10", @@ -309,6 +333,7 @@ class RoomVersions: msc2716_redactions=False, msc3787_knock_restricted_join_rule=True, msc3667_int_only_power_levels=True, + msc3931_push_features=[], ) MSC2716v4 = RoomVersion( "org.matrix.msc2716v4", @@ -327,6 +352,27 @@ class RoomVersions: msc2716_redactions=True, msc3787_knock_restricted_join_rule=False, msc3667_int_only_power_levels=False, + msc3931_push_features=[], + ) + MSC1767v10 = RoomVersion( + # MSC1767 (Extensible Events) based on room version "10" + "org.matrix.msc1767.10", + RoomDisposition.UNSTABLE, + EventFormatVersions.ROOM_V4_PLUS, + StateResolutionVersions.V2, + enforce_key_validity=True, + special_case_aliases_auth=False, + strict_canonicaljson=True, + limit_notifications_power_levels=True, + msc2176_redaction_rules=False, + msc3083_join_rules=True, + msc3375_redaction_rules=True, + msc2403_knocking=True, + msc2716_historical=False, + msc2716_redactions=False, + msc3787_knock_restricted_join_rule=True, + msc3667_int_only_power_levels=True, + msc3931_push_features=[PushRuleRoomFlag.EXTENSIBLE_EVENTS], ) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 46dc731696..bcc8abe20c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -44,40 +44,8 @@ from synapse.http.server import JsonResource, OptionsResource from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.rest import ClientRestResource from synapse.rest.admin import register_servlets_for_media_repo -from synapse.rest.client import ( - account_data, - events, - initial_sync, - login, - presence, - profile, - push_rule, - read_marker, - receipts, - relations, - room, - room_batch, - room_keys, - sendtodevice, - sync, - tags, - user_directory, - versions, - voip, -) -from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet -from synapse.rest.client.devices import DevicesRestServlet -from synapse.rest.client.keys import ( - KeyChangesServlet, - KeyQueryServlet, - KeyUploadServlet, - OneTimeKeyServlet, -) -from synapse.rest.client.register import ( - RegisterRestServlet, - RegistrationTokenValidityRestServlet, -) from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyResource from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -200,45 +168,7 @@ class GenericWorkerServer(HomeServer): if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": - resource = JsonResource(self, canonical_json=False) - - RegisterRestServlet(self).register(resource) - RegistrationTokenValidityRestServlet(self).register(resource) - login.register_servlets(self, resource) - ThreepidRestServlet(self).register(resource) - WhoamiRestServlet(self).register(resource) - DevicesRestServlet(self).register(resource) - - # Read-only - KeyUploadServlet(self).register(resource) - KeyQueryServlet(self).register(resource) - KeyChangesServlet(self).register(resource) - OneTimeKeyServlet(self).register(resource) - - voip.register_servlets(self, resource) - push_rule.register_servlets(self, resource) - versions.register_servlets(self, resource) - - profile.register_servlets(self, resource) - - sync.register_servlets(self, resource) - events.register_servlets(self, resource) - room.register_servlets(self, resource, is_worker=True) - relations.register_servlets(self, resource) - room.register_deprecated_servlets(self, resource) - initial_sync.register_servlets(self, resource) - room_batch.register_servlets(self, resource) - room_keys.register_servlets(self, resource) - tags.register_servlets(self, resource) - account_data.register_servlets(self, resource) - receipts.register_servlets(self, resource) - read_marker.register_servlets(self, resource) - - sendtodevice.register_servlets(self, resource) - - user_directory.register_servlets(self, resource) - - presence.register_servlets(self, resource) + resource: Resource = ClientRestResource(self) resources[CLIENT_API_PREFIX] = resource diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 500bdde3a9..bf4e6c629b 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -32,9 +32,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -# Type for the `device_one_time_key_counts` field in an appservice transaction +# Type for the `device_one_time_keys_count` field in an appservice transaction # user ID -> {device ID -> {algorithm -> count}} -TransactionOneTimeKeyCounts = Dict[str, Dict[str, Dict[str, int]]] +TransactionOneTimeKeysCount = Dict[str, Dict[str, Dict[str, int]]] # Type for the `device_unused_fallback_key_types` field in an appservice transaction # user ID -> {device ID -> [algorithm]} @@ -376,7 +376,7 @@ class AppServiceTransaction: events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], - one_time_key_counts: TransactionOneTimeKeyCounts, + one_time_keys_count: TransactionOneTimeKeysCount, unused_fallback_keys: TransactionUnusedFallbackKeys, device_list_summary: DeviceListUpdates, ): @@ -385,7 +385,7 @@ class AppServiceTransaction: self.events = events self.ephemeral = ephemeral self.to_device_messages = to_device_messages - self.one_time_key_counts = one_time_key_counts + self.one_time_keys_count = one_time_keys_count self.unused_fallback_keys = unused_fallback_keys self.device_list_summary = device_list_summary @@ -402,7 +402,7 @@ class AppServiceTransaction: events=self.events, ephemeral=self.ephemeral, to_device_messages=self.to_device_messages, - one_time_key_counts=self.one_time_key_counts, + one_time_keys_count=self.one_time_keys_count, unused_fallback_keys=self.unused_fallback_keys, device_list_summary=self.device_list_summary, txn_id=self.id, diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 60774b240d..edafd433cd 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind from synapse.api.errors import CodeMessageException from synapse.appservice import ( ApplicationService, - TransactionOneTimeKeyCounts, + TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys, ) from synapse.events import EventBase @@ -262,7 +262,7 @@ class ApplicationServiceApi(SimpleHttpClient): events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], - one_time_key_counts: TransactionOneTimeKeyCounts, + one_time_keys_count: TransactionOneTimeKeysCount, unused_fallback_keys: TransactionUnusedFallbackKeys, device_list_summary: DeviceListUpdates, txn_id: Optional[int] = None, @@ -310,10 +310,13 @@ class ApplicationServiceApi(SimpleHttpClient): # TODO: Update to stable prefixes once MSC3202 completes FCP merge if service.msc3202_transaction_extensions: - if one_time_key_counts: + if one_time_keys_count: body[ "org.matrix.msc3202.device_one_time_key_counts" - ] = one_time_key_counts + ] = one_time_keys_count + body[ + "org.matrix.msc3202.device_one_time_keys_count" + ] = one_time_keys_count if unused_fallback_keys: body[ "org.matrix.msc3202.device_unused_fallback_key_types" diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 430ffbcd1f..7b562795a3 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -64,7 +64,7 @@ from typing import ( from synapse.appservice import ( ApplicationService, ApplicationServiceState, - TransactionOneTimeKeyCounts, + TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys, ) from synapse.appservice.api import ApplicationServiceApi @@ -258,7 +258,7 @@ class _ServiceQueuer: ): return - one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None + one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None if ( @@ -269,7 +269,7 @@ class _ServiceQueuer: # for the users which are mentioned in this transaction, # as well as the appservice's sender. ( - one_time_key_counts, + one_time_keys_count, unused_fallback_keys, ) = await self._compute_msc3202_otk_counts_and_fallback_keys( service, events, ephemeral, to_device_messages_to_send @@ -281,7 +281,7 @@ class _ServiceQueuer: events, ephemeral, to_device_messages_to_send, - one_time_key_counts, + one_time_keys_count, unused_fallback_keys, device_list_summary, ) @@ -296,7 +296,7 @@ class _ServiceQueuer: events: Iterable[EventBase], ephemerals: Iterable[JsonDict], to_device_messages: Iterable[JsonDict], - ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]: + ) -> Tuple[TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys]: """ Given a list of the events, ephemeral messages and to-device messages, - first computes a list of application services users that may have @@ -367,7 +367,7 @@ class _TransactionController: events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, to_device_messages: Optional[List[JsonDict]] = None, - one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None, + one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None, unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None, device_list_summary: Optional[DeviceListUpdates] = None, ) -> None: @@ -380,7 +380,7 @@ class _TransactionController: events: The persistent events to include in the transaction. ephemeral: The ephemeral events to include in the transaction. to_device_messages: The to-device messages to include in the transaction. - one_time_key_counts: Counts of remaining one-time keys for relevant + one_time_keys_count: Counts of remaining one-time keys for relevant appservice devices in the transaction. unused_fallback_keys: Lists of unused fallback keys for relevant appservice devices in the transaction. @@ -397,7 +397,7 @@ class _TransactionController: events=events, ephemeral=ephemeral or [], to_device_messages=to_device_messages or [], - one_time_key_counts=one_time_key_counts or {}, + one_time_keys_count=one_time_keys_count or {}, unused_fallback_keys=unused_fallback_keys or {}, device_list_summary=device_list_summary or DeviceListUpdates(), ) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index d4b71d1673..573fa0386f 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py
@@ -16,6 +16,7 @@ from typing import Any, Optional import attr +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.config._base import Config from synapse.types import JsonDict @@ -53,9 +54,6 @@ class ExperimentalConfig(Config): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) - # MSC3030 (Jump to date API endpoint) - self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False) - # MSC2409 (this setting only relates to optionally sending to-device messages). # Presence, typing and read receipt EDUs are already sent to application services that # have opted in to receive them. If enabled, this adds to-device messages to that list. @@ -131,3 +129,10 @@ class ExperimentalConfig(Config): # MSC3912: Relation-based redactions. self.msc3912_enabled: bool = experimental.get("msc3912_enabled", False) + + # MSC1767 and friends: Extensible Events + self.msc1767_enabled: bool = experimental.get("msc1767_enabled", False) + if self.msc1767_enabled: + # Enable room version (and thus applicable push rules from MSC3931/3932) + version_id = RoomVersions.MSC1767v10.identifier + KNOWN_ROOM_VERSIONS[version_id] = RoomVersions.MSC1767v10 diff --git a/synapse/config/push.py b/synapse/config/push.py
index 979b128eae..3b5378e6ea 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py
@@ -26,6 +26,7 @@ class PushConfig(Config): def read_config(self, config: JsonDict, **kwargs: Any) -> None: push_config = config.get("push") or {} self.push_include_content = push_config.get("include_content", True) + self.enable_push = push_config.get("enabled", True) self.push_group_unread_count_by_room = push_config.get( "group_unread_count_by_room", True ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ed15f88350..69310d9035 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py
@@ -14,7 +14,6 @@ import abc import logging -import urllib from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple import attr @@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher): results = {} - async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None: + async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None: server_name = key_to_fetch_item.server_name - key_ids = key_to_fetch_item.key_ids try: - keys = await self.get_server_verify_key_v2_direct(server_name, key_ids) + keys = await self.get_server_verify_keys_v2_direct(server_name) results[server_name] = keys except KeyLookupError as e: - logger.warning( - "Error looking up keys %s from %s: %s", key_ids, server_name, e - ) + logger.warning("Error looking up keys from %s: %s", server_name, e) except Exception: - logger.exception("Error getting keys %s from %s", key_ids, server_name) + logger.exception("Error getting keys from %s", server_name) - await yieldable_gather_results(get_key, keys_to_fetch) + await yieldable_gather_results(get_keys, keys_to_fetch) return results - async def get_server_verify_key_v2_direct( - self, server_name: str, key_ids: Iterable[str] + async def get_server_verify_keys_v2_direct( + self, server_name: str ) -> Dict[str, FetchKeyResult]: """ Args: - server_name: - key_ids: + server_name: Server to request keys from Returns: Map from key ID to lookup result @@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher): Raises: KeyLookupError if there was a problem making the lookup """ - keys: Dict[str, FetchKeyResult] = {} - - for requested_key_id in key_ids: - # we may have found this key as a side-effect of asking for another. - if requested_key_id in keys: - continue - - time_now_ms = self.clock.time_msec() - try: - response = await self.client.get_json( - destination=server_name, - path="/_matrix/key/v2/server/" - + urllib.parse.quote(requested_key_id, safe=""), - ignore_backoff=True, - # we only give the remote server 10s to respond. It should be an - # easy request to handle, so if it doesn't reply within 10s, it's - # probably not going to. - # - # Furthermore, when we are acting as a notary server, we cannot - # wait all day for all of the origin servers, as the requesting - # server will otherwise time out before we can respond. - # - # (Note that get_json may make 4 attempts, so this can still take - # almost 45 seconds to fetch the headers, plus up to another 60s to - # read the response). - timeout=10000, - ) - except (NotRetryingDestination, RequestSendFailed) as e: - # these both have str() representations which we can't really improve - # upon - raise KeyLookupError(str(e)) - except HttpResponseException as e: - raise KeyLookupError("Remote server returned an error: %s" % (e,)) - - assert isinstance(response, dict) - if response["server_name"] != server_name: - raise KeyLookupError( - "Expected a response for server %r not %r" - % (server_name, response["server_name"]) - ) - - response_keys = await self.process_v2_response( - from_server=server_name, - response_json=response, - time_added_ms=time_now_ms, + time_now_ms = self.clock.time_msec() + try: + response = await self.client.get_json( + destination=server_name, + path="/_matrix/key/v2/server", + ignore_backoff=True, + # we only give the remote server 10s to respond. It should be an + # easy request to handle, so if it doesn't reply within 10s, it's + # probably not going to. + # + # Furthermore, when we are acting as a notary server, we cannot + # wait all day for all of the origin servers, as the requesting + # server will otherwise time out before we can respond. + # + # (Note that get_json may make 4 attempts, so this can still take + # almost 45 seconds to fetch the headers, plus up to another 60s to + # read the response). + timeout=10000, ) - await self.store.store_server_verify_keys( - server_name, - time_now_ms, - ((server_name, key_id, key) for key_id, key in response_keys.items()), + except (NotRetryingDestination, RequestSendFailed) as e: + # these both have str() representations which we can't really improve + # upon + raise KeyLookupError(str(e)) + except HttpResponseException as e: + raise KeyLookupError("Remote server returned an error: %s" % (e,)) + + assert isinstance(response, dict) + if response["server_name"] != server_name: + raise KeyLookupError( + "Expected a response for server %r not %r" + % (server_name, response["server_name"]) ) - keys.update(response_keys) - return keys + return await self.process_v2_response( + from_server=server_name, + response_json=response, + time_added_ms=time_now_ms, + ) diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index d62906043f..94dd1298e1 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py
@@ -28,8 +28,8 @@ from synapse.event_auth import auth_types_for_event from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict from synapse.state import StateHandler from synapse.storage.databases.main import DataStore -from synapse.storage.state import StateFilter from synapse.types import EventID, JsonDict +from synapse.types.state import StateFilter from synapse.util import Clock from synapse.util.stringutils import random_string diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 1c0e96bec7..6eaef8b57a 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py
@@ -23,7 +23,7 @@ from synapse.types import JsonDict, StateMap if TYPE_CHECKING: from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore - from synapse.storage.state import StateFilter + from synapse.types.state import StateFilter @attr.s(slots=True, auto_attribs=True) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c4c0bc7315..137cfb3346 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -771,17 +771,28 @@ class FederationClient(FederationBase): """ if synapse_error is None: synapse_error = e.to_synapse_error() - # There is no good way to detect an "unknown" endpoint. + # MSC3743 specifies that servers should return a 404 or 405 with an errcode + # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or + # to an unknown method, respectively. # - # Dendrite returns a 404 (with a body of "404 page not found"); - # Conduit returns a 404 (with no body); and Synapse returns a 400 - # with M_UNRECOGNIZED. - # - # This needs to be rather specific as some endpoints truly do return 404 - # errors. + # Older versions of servers don't properly handle this. This needs to be + # rather specific as some endpoints truly do return 404 errors. return ( - e.code == 404 and (not e.response or e.response == b"404 page not found") - ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED) + # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method. + (e.code == 404 or e.code == 405) + and ( + # Older Dendrites returned a text or empty body. + # Older Conduit returned an empty body. + not e.response + or e.response == b"404 page not found" + # The proper response JSON with M_UNRECOGNIZED errcode. + or synapse_error.errcode == Codes.UNRECOGNIZED + ) + ) or ( + # Older Synapses returned a 400 error. + e.code == 400 + and synapse_error.errcode == Codes.UNRECOGNIZED + ) async def _try_destination_list( self, @@ -1691,9 +1702,19 @@ class FederationClient(FederationBase): # to return events on *both* sides of the timestamp to # help reconcile the gap faster. _timestamp_to_event_from_destination, + # Since this endpoint is new, we should try other servers before giving up. + # We can safely remove this in a year (remove after 2023-11-16). + failover_on_unknown_endpoint=True, ) return timestamp_to_event_response - except SynapseError: + except SynapseError as e: + logger.warn( + "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s", + room_id, + timestamp, + direction, + e, + ) return None async def _timestamp_to_event_from_destination( diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains_set = await self._storage_controllers.state.get_current_hosts_in_room( + domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) domains = [ diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3ae5e8634c..ffc9d95ee7 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import JsonDict, ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.visibility import filter_events_for_server @@ -136,8 +136,11 @@ class PerDestinationQueue: # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} + # List of room_id -> receipt_type -> user_id -> receipt_dict, + # + # Each receipt can only have a single receipt per + # (room ID, receipt type, user ID, thread ID) tuple. + self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -202,17 +205,53 @@ class PerDestinationQueue: Args: receipt: receipt to be queued """ - self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( - receipt.receipt_type, {} - )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} + serialized_receipt: JsonDict = { + "event_ids": receipt.event_ids, + "data": receipt.data, + } + if receipt.thread_id is not None: + serialized_receipt["data"]["thread_id"] = receipt.thread_id + + # Find which EDU to add this receipt to. There's three situations depending + # on the (room ID, receipt type, user, thread ID) tuple: + # + # 1. If it fully matches, clobber the information. + # 2. If it is missing, add the information. + # 3. If the subset tuple of (room ID, receipt type, user) matches, check + # the next EDU (or add a new EDU). + for edu in self._pending_receipt_edus: + receipt_content = edu.setdefault(receipt.room_id, {}).setdefault( + receipt.receipt_type, {} + ) + # If this room ID, receipt type, user ID is not in this EDU, OR if + # the full tuple matches, use the current EDU. + if ( + receipt.user_id not in receipt_content + or receipt_content[receipt.user_id].get("thread_id") + == receipt.thread_id + ): + receipt_content[receipt.user_id] = serialized_receipt + break + + # If no matching EDU was found, create a new one. + else: + self._pending_receipt_edus.append( + { + receipt.room_id: { + receipt.receipt_type: {receipt.user_id: serialized_receipt} + } + } + ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any read-receipts for this room, it may be that we've already - # sent them out, so we don't need to flush. - if room_id not in self._pending_rrs: - return - self._rrs_pending_flush = True - self.attempt_new_transaction() + # If there are any pending receipts for this room then force-flush them + # in a new transaction. + for edu in self._pending_receipt_edus: + if room_id in edu: + self._rrs_pending_flush = True + self.attempt_new_transaction() + # No use in checking remaining EDUs if the room was found. + break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -351,7 +390,7 @@ class PerDestinationQueue: self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} - self._pending_rrs = {} + self._pending_receipt_edus = [] self._start_catching_up() except FederationDeniedError as e: @@ -543,22 +582,27 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: - if not self._pending_rrs: + def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + if not self._pending_receipt_edus: return if not force_flush and not self._rrs_pending_flush: # not yet time for this lot return - edu = Edu( - origin=self._server_name, - destination=self._destination, - edu_type=EduTypes.RECEIPT, - content=self._pending_rrs, - ) - self._pending_rrs = {} - self._rrs_pending_flush = False - yield edu + # Send at most limit EDUs for receipts. + for content in self._pending_receipt_edus[:limit]: + yield Edu( + origin=self._server_name, + destination=self._destination, + edu_type=EduTypes.RECEIPT, + content=content, + ) + self._pending_receipt_edus = self._pending_receipt_edus[limit:] + + # If there are still pending read-receipts, don't reset the pending flush + # flag. + if not self._pending_receipt_edus: + self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus @@ -597,7 +641,7 @@ class PerDestinationQueue: if not message_id: continue - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) edus = [ Edu( @@ -645,27 +689,61 @@ class _TransactionQueueManager: async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. - # We start by fetching device related EDUs, i.e device updates and to - # device messages. We have to keep 2 free slots for presence and rr_edus. - device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2 + # There's a maximum number of EDUs that can be sent with a transaction, + # generally device updates and to-device messages get priority, but we + # want to ensure that there's room for some other EDUs as well. + # + # This is done by: + # + # * Add a presence EDU, if one exists. + # * Add up-to a small limit of read receipt EDUs. + # * Add to-device EDUs, but leave some space for device list updates. + # * Add device list updates EDUs. + # * If there's any remaining room, add other EDUs. + pending_edus = [] + + # Add presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type=EduTypes.PRESENCE, + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} - # We prioritize to-device messages so that existing encryption channels + # Add read receipt EDUs. + pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) + + # Next, prioritize to-device messages so that existing encryption channels # work. We also keep a few slots spare (by reducing the limit) so that # we can still trickle out some device list updates. ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10) + ) = await self.queue._get_to_device_message_edus(edu_limit - 10) if to_device_edus: self._device_stream_id = device_stream_id else: self.queue._last_device_stream_id = device_stream_id - device_edu_limit -= len(to_device_edus) + pending_edus.extend(to_device_edus) + edu_limit -= len(to_device_edus) + # Add device list update EDUs. device_update_edus, dev_list_id = await self.queue._get_device_update_edus( - device_edu_limit + edu_limit ) if device_update_edus: @@ -673,40 +751,17 @@ class _TransactionQueueManager: else: self.queue._last_device_list_stream_id = dev_list_id - pending_edus = device_update_edus + to_device_edus - - # Now add the read receipt EDU. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) - - # And presence EDU. - if self.queue._pending_presence: - pending_edus.append( - Edu( - origin=self.queue._server_name, - destination=self.queue._destination, - edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, - ) - ) - self.queue._pending_presence = {} + pending_edus.extend(device_update_edus) + edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend( - self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self.queue._pending_edus_keyed - ): + other_edus = self.queue._pop_pending_edus(edu_limit) + pending_edus.extend(other_edus) + edu_limit -= len(other_edus) + while edu_limit > 0 and self.queue._pending_edus_keyed: _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) + edu_limit -= 1 # Now we look for any PDUs to send, by getting up to 50 PDUs from the # queue @@ -717,8 +772,10 @@ class _TransactionQueueManager: # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + if edu_limit: + pending_edus.extend( + self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) + ) if self._pdus: self._last_stream_ordering = self._pdus[ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index a3cfc701cd..77f1f39cac 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -185,9 +185,8 @@ class TransportLayerClient: Raises: Various exceptions when the request fails """ - path = _create_path( - FEDERATION_UNSTABLE_PREFIX, - "/org.matrix.msc3030/timestamp_to_event/%s", + path = _create_v1_path( + "/timestamp_to_event/%s", room_id, ) diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 50623cd385..2725f53cf6 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py
@@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import ( from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, - FederationTimestampLookupServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -291,13 +290,6 @@ def register_servlets( ) for servletclass in SERVLET_GROUPS[servlet_group]: - # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled - if ( - servletclass == FederationTimestampLookupServlet - and not hs.config.experimental.msc3030_enabled - ): - continue - # Only allow the `/account_status` servlet if msc3720 is enabled if ( servletclass == FederationAccountStatusServlet diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 205fd16daa..53e77b4bb6 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py
@@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet): `dir` can be `f` or `b` to indicate forwards and backwards in time from the given timestamp. - GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> + GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> { "event_id": ... } """ PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030" async def on_GET( self, diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 66f5b8d108..5d1d21cdc8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -578,9 +578,6 @@ class ApplicationServicesHandler: device_id, ), messages in recipient_device_to_messages.items(): for message_json in messages: - # Remove 'message_id' from the to-device message, as it's an internal ID - message_json.pop("message_id", None) - message_payload.append( { "to_user_id": user_id, @@ -615,8 +612,8 @@ class ApplicationServicesHandler: ) # Fetch the users who have modified their device list since then. - users_with_changed_device_lists = ( - await self.store.get_users_whose_devices_changed(from_key, to_key=new_key) + users_with_changed_device_lists = await self.store.get_all_devices_changed( + from_key, to_key=new_key ) # Filter out any users the application service is not interested in diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b1e55e1b9e..d4750a32e6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): # Check if we are partially joining any rooms. If so we need to store # all device list updates so that we can handle them correctly once we # know who is in the room. - # TODO(faster joins): this fetches and processes a bunch of data that we don't + # TODO(faster_joins): this fetches and processes a bunch of data that we don't # use. Could be replaced by a tighter query e.g. # SELECT EXISTS(SELECT 1 FROM partial_state_rooms) partial_rooms = await self.store.get_partial_state_room_resync_info() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..75e89850f5 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py
@@ -15,7 +15,7 @@ import logging from typing import TYPE_CHECKING, Any, Dict -from synapse.api.constants import EduTypes, ToDeviceEventTypes +from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes from synapse.api.errors import SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background @@ -216,14 +216,24 @@ class DeviceMessageHandler: """ sender_user_id = requester.user.to_string() - message_id = random_string(16) - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) - - log_kv({"number_of_to_device_messages": len(messages)}) - set_tag("sender", sender_user_id) + set_tag(SynapseTags.TO_DEVICE_TYPE, message_type) + set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id) local_messages = {} remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} for user_id, by_device in messages.items(): + # add an opentracing log entry for each message + for device_id, message_content in by_device.items(): + log_kv( + { + "event": "send_to_device_message", + "user_id": user_id, + "device_id": device_id, + EventContentFields.TO_DEVICE_MSGID: message_content.get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) + # Ratelimit local cross-user key requests by the sending device. if ( message_type == ToDeviceEventTypes.RoomKeyRequest @@ -233,6 +243,7 @@ class DeviceMessageHandler: requester, (sender_user_id, requester.device_id) ) if not allowed: + log_kv({"message": f"dropping key requests to {user_id}"}) logger.info( "Dropping room_key_request from %s to %s due to rate limit", sender_user_id, @@ -247,18 +258,11 @@ class DeviceMessageHandler: "content": message_content, "type": message_type, "sender": sender_user_id, - "message_id": message_id, } for device_id, message_content in by_device.items() } if messages_by_device: local_messages[user_id] = messages_by_device - log_kv( - { - "user_id": user_id, - "device_id": list(messages_by_device), - } - ) else: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device @@ -267,7 +271,11 @@ class DeviceMessageHandler: remote_edu_contents = {} for destination, messages in remote_messages.items(): - log_kv({"destination": destination}) + # The EDU contains a "message_id" property which is used for + # idempotence. Make up a random one. + message_id = random_string(16) + log_kv({"destination": destination, "message_id": message_id}) + remote_edu_contents[destination] = { "messages": messages, "sender": sender_user_id, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d92582fd5c..b2784d7333 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -70,8 +70,8 @@ from synapse.replication.http.federation import ( ) from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.state import StateFilter from synapse.types import JsonDict, get_domain_from_id +from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -152,6 +152,7 @@ class FederationHandler: self._federation_event_handler = hs.get_federation_event_handler() self._device_handler = hs.get_device_handler() self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() + self._notifier = hs.get_notifier() self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs @@ -1692,6 +1693,9 @@ class FederationHandler: self._storage_controllers.state.notify_room_un_partial_stated( room_id ) + # Poke the notifier so that other workers see the write to + # the un-partial-stated rooms stream. + self._notifier.notify_replication() # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f7223b03c3..d2facdab60 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -75,7 +75,6 @@ from synapse.replication.http.federation import ( from synapse.state import StateResolutionStore from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.state import StateFilter from synapse.types import ( PersistedEventPosition, RoomStreamToken, @@ -83,6 +82,7 @@ from synapse.types import ( UserID, get_domain_from_id, ) +from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4cf593cfdc..d6e90ef259 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -59,7 +59,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.state import StateFilter from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -70,6 +69,7 @@ from synapse.types import ( UserID, create_requester, ) +from synapse.types.state import StateFilter from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.caches.expiringcache import ExpiringCache @@ -1135,11 +1135,13 @@ class EventCreationHandler: ) state_events = await self.store.get_events_as_list(state_event_ids) # Create a StateMap[str] - state_map = {(e.type, e.state_key): e.event_id for e in state_events} + current_state_ids = { + (e.type, e.state_key): e.event_id for e in state_events + } # Actually strip down and only use the necessary auth events auth_event_ids = self._event_auth_handler.compute_auth_events( event=temp_event, - current_state_ids=state_map, + current_state_ids=current_state_ids, for_verification=False, ) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index c572508a02..8c8ff18a1a 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -27,9 +27,9 @@ from synapse.handlers.room import ShutdownRoomResponse from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin -from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, StreamKeyType +from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf08737d11..2af90b25a3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): if from_key is not None: # First get all users that have had a presence update - updated_users = stream_change_cache.get_all_entities_changed(from_key) + result = stream_change_cache.get_all_entities_changed(from_key) # Cross-reference users we're interested in with those that have had updates. - if updated_users is not None: + if result.hit: + updated_users = result.entities + # If we have the full list of changes for presence we can # simply check which ones share a room with the user. get_updates_counter.labels("stream").inc() @@ -1764,14 +1766,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): Returns: A list of presence states for the given user to receive. """ + updated_users = None if from_key: # Only return updates since the last sync - updated_users = self.store.presence_stream_cache.get_all_entities_changed( - from_key - ) - if not updated_users: - updated_users = [] + result = self.store.presence_stream_cache.get_all_entities_changed(from_key) + if result.hit: + updated_users = result.entities + if updated_users is not None: # Get the actual presence update for each change users_to_state = await self.get_presence_handler().current_state_for_users( updated_users diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index ac01582442..6a4fed1156 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py
@@ -92,7 +92,6 @@ class ReceiptsHandler: continue # Check if these receipts apply to a thread. - thread_id = None data = user_values.get("data", {}) thread_id = data.get("thread_id") # If the thread ID is invalid, consider it missing. diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 6307fa9c5d..c611efb760 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py
@@ -46,8 +46,8 @@ from synapse.replication.http.register import ( ReplicationRegisterServlet, ) from synapse.spam_checker_api import RegistrationBehaviour -from synapse.storage.state import StateFilter from synapse.types import RoomAlias, UserID, create_requester +from synapse.types.state import StateFilter if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6dcfd86fdf..f81241c2b3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -62,7 +62,6 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.handlers.relations import BundledAggregations from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin -from synapse.storage.state import StateFilter from synapse.streams import EventSource from synapse.types import ( JsonDict, @@ -77,6 +76,7 @@ from synapse.types import ( UserID, create_requester, ) +from synapse.types.state import StateFilter from synapse.util import stringutils from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_and_validate_server_name diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6ad2b38b8f..0c39e852a1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -34,7 +34,6 @@ from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.logging import opentracing from synapse.module_api import NOT_SPAM -from synapse.storage.state import StateFilter from synapse.types import ( JsonDict, Requester, @@ -45,6 +44,7 @@ from synapse.types import ( create_requester, get_domain_from_id, ) +from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index bcab98c6d5..33115ce488 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py
@@ -23,8 +23,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError, SynapseError from synapse.api.filtering import Filter from synapse.events import EventBase -from synapse.storage.state import StateFilter from synapse.types import JsonDict, StreamKeyType, UserID +from synapse.types.state import StateFilter from synapse.visibility import filter_events_for_client if TYPE_CHECKING: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 259456b55d..7d6a653747 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -31,19 +31,24 @@ from typing import ( import attr from prometheus_client import Counter -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.handlers.relations import BundledAggregations from synapse.logging.context import current_context -from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + trace, +) from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.roommember import MemberSummary -from synapse.storage.state import StateFilter from synapse.types import ( DeviceListUpdates, JsonDict, @@ -55,6 +60,7 @@ from synapse.types import ( StreamToken, UserID, ) +from synapse.types.state import StateFilter from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache @@ -1426,14 +1432,14 @@ class SyncHandler: logger.debug("Fetching OTK data") device_id = sync_config.device_id - one_time_key_counts: JsonDict = {} + one_time_keys_count: JsonDict = {} unused_fallback_key_types: List[str] = [] if device_id: # TODO: We should have a way to let clients differentiate between the states of: # * no change in OTK count since the provided since token # * the server has zero OTKs left for this device # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 - one_time_key_counts = await self.store.count_e2e_one_time_keys( + one_time_keys_count = await self.store.count_e2e_one_time_keys( user_id, device_id ) unused_fallback_key_types = ( @@ -1463,7 +1469,7 @@ class SyncHandler: archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, - device_one_time_keys_count=one_time_key_counts, + device_one_time_keys_count=one_time_keys_count, device_unused_fallback_key_types=unused_fallback_key_types, next_batch=sync_result_builder.now_token, ) @@ -1528,10 +1534,12 @@ class SyncHandler: # # If we don't have that info cached then we get all the users that # share a room with our user and check if those users have changed. - changed_users = self.store.get_cached_device_list_changes( + cache_result = self.store.get_cached_device_list_changes( since_token.device_list_key ) - if changed_users is not None: + if cache_result.hit: + changed_users = cache_result.entities + result = await self.store.get_rooms_for_users(changed_users) for changed_user_id, entries in result.items(): @@ -1584,6 +1592,7 @@ class SyncHandler: else: return DeviceListUpdates() + @trace async def _generate_sync_entry_for_to_device( self, sync_result_builder: "SyncResultBuilder" ) -> None: @@ -1603,11 +1612,16 @@ class SyncHandler: ) for message in messages: - # We pop here as we shouldn't be sending the message ID down - # `/sync` - message_id = message.pop("message_id", None) - if message_id: - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + log_kv( + { + "event": "to_device_message", + "sender": message["sender"], + "type": message["type"], + EventContentFields.TO_DEVICE_MSGID: message["content"].get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a0ea719430..3f656ea4f5 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler): if last_id == current_id: return [], current_id, False - changed_rooms: Optional[ - Iterable[str] - ] = self._typing_stream_change_cache.get_all_entities_changed(last_id) + result = self._typing_stream_change_cache.get_all_entities_changed(last_id) - if changed_rooms is None: + if result.hit: + changed_rooms: Iterable[str] = result.entities + else: changed_rooms = self._room_serials rows = [] diff --git a/synapse/http/server.py b/synapse/http/server.py
index 051a1899a0..2563858f3c 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py
@@ -577,7 +577,24 @@ def _unrecognised_request_handler(request: Request) -> NoReturn: Args: request: Unused, but passed in to match the signature of ServletCallback. """ - raise UnrecognizedRequestError() + raise UnrecognizedRequestError(code=404) + + +class UnrecognizedRequestResource(resource.Resource): + """ + Similar to twisted.web.resource.NoResource, but returns a JSON 404 with an + errcode of M_UNRECOGNIZED. + """ + + def render(self, request: SynapseRequest) -> int: + f = failure.Failure(UnrecognizedRequestError(code=404)) + return_json_error(f, request, None) + # A response has already been sent but Twisted requires either NOT_DONE_YET + # or the response bytes as a return value. + return NOT_DONE_YET + + def getChild(self, name: str, request: Request) -> resource.Resource: + return self class RootRedirect(resource.Resource): diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index b69060854f..a705af8356 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py
@@ -292,8 +292,15 @@ logger = logging.getLogger(__name__) class SynapseTags: - # The message ID of any to_device message processed - TO_DEVICE_MESSAGE_ID = "to_device.message_id" + # The message ID of any to_device EDU processed + TO_DEVICE_EDU_ID = "to_device.edu_id" + + # Details about to-device messages + TO_DEVICE_TYPE = "to_device.type" + TO_DEVICE_SENDER = "to_device.sender" + TO_DEVICE_RECIPIENT = "to_device.recipient" + TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device" + TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID # Whether the sync response has new data to be returned to the client. SYNC_RESULT = "sync.new_data" diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py
index 0a22ea3d92..6e05b043d3 100644 --- a/synapse/metrics/common_usage_metrics.py +++ b/synapse/metrics/common_usage_metrics.py
@@ -54,7 +54,9 @@ class CommonUsageMetricsManager: async def setup(self) -> None: """Keep the gauges for common usage metrics up to date.""" - await self._update_gauges() + run_as_background_process( + desc="common_usage_metrics_update_gauges", func=self._update_gauges + ) self._clock.looping_call( run_as_background_process, 5 * 60 * 1000, diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 38c71b8b43..6da9e1d8b2 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py
@@ -112,7 +112,6 @@ from synapse.storage.background_updates import ( ) from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo -from synapse.storage.state import StateFilter from synapse.types import ( DomainSpecificString, JsonDict, @@ -125,6 +124,7 @@ from synapse.types import ( UserProfile, create_requester, ) +from synapse.types.state import StateFilter from synapse.util import Clock from synapse.util.async_helpers import maybe_awaitable from synapse.util.caches.descriptors import CachedFunction, cached diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 75b7e126ca..36e5b327ef 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -29,13 +29,14 @@ from typing import ( from prometheus_client import Counter from synapse.api.constants import MAIN_TIMELINE, EventTypes, Membership, RelationTypes +from synapse.api.room_versions import PushRuleRoomFlag, RoomVersion from synapse.event_auth import auth_types_for_event, get_user_power_level from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext from synapse.state import POWER_KEY from synapse.storage.databases.main.roommember import EventIdMembership -from synapse.storage.state import StateFilter from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator +from synapse.types.state import StateFilter from synapse.util.caches import register_cache from synapse.util.metrics import measure_func from synapse.visibility import filter_event_for_clients_with_state @@ -105,6 +106,7 @@ class BulkPushRuleEvaluator: self.store = hs.get_datastores().main self.clock = hs.get_clock() self._event_auth_handler = hs.get_event_auth_handler() + self.should_calculate_push_rules = self.hs.config.push.enable_push self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled @@ -268,6 +270,8 @@ class BulkPushRuleEvaluator: for each event, check if the message should increment the unread count, and insert the results into the event_push_actions_staging table. """ + if not self.should_calculate_push_rules: + return # For batched events the power level events may not have been persisted yet, # so we pass in the batched events. Thus if the event cannot be found in the # database we can check in the batch. @@ -338,13 +342,19 @@ class BulkPushRuleEvaluator: for user_id, level in notification_levels.items(): notification_levels[user_id] = int(level) + room_version_features = event.room_version.msc3931_push_features + if not room_version_features: + room_version_features = [] + evaluator = PushRuleEvaluator( - _flatten_dict(event), + _flatten_dict(event, room_version=event.room_version), room_member_count, sender_power_level, notification_levels, related_events, self._related_event_match_enabled, + room_version_features, + self.hs.config.experimental.msc1767_enabled, # MSC3931 flag ) users = rules_by_user.keys() @@ -420,6 +430,7 @@ StateGroup = Union[object, int] def _flatten_dict( d: Union[EventBase, Mapping[str, Any]], + room_version: Optional[RoomVersion] = None, prefix: Optional[List[str]] = None, result: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: @@ -431,6 +442,31 @@ def _flatten_dict( if isinstance(value, str): result[".".join(prefix + [key])] = value.lower() elif isinstance(value, Mapping): + # do not set `room_version` due to recursion considerations below _flatten_dict(value, prefix=(prefix + [key]), result=result) + # `room_version` should only ever be set when looking at the top level of an event + if ( + room_version is not None + and PushRuleRoomFlag.EXTENSIBLE_EVENTS in room_version.msc3931_push_features + and isinstance(d, EventBase) + ): + # Room supports extensible events: replace `content.body` with the plain text + # representation from `m.markup`, as per MSC1767. + markup = d.get("content").get("m.markup") + if room_version.identifier.startswith("org.matrix.msc1767."): + markup = d.get("content").get("org.matrix.msc1767.markup") + if markup is not None and isinstance(markup, list): + text = "" + for rep in markup: + if not isinstance(rep, dict): + # invalid markup - skip all processing + break + if rep.get("mimetype", "text/plain") == "text/plain": + rep_text = rep.get("body") + if rep_text is not None and isinstance(rep_text, str): + text = rep_text.lower() + break + result["content.body"] = text + return result diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index c2575ba3d9..93b255ced5 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py
@@ -37,8 +37,8 @@ from synapse.push.push_types import ( TemplateVars, ) from synapse.storage.databases.main.event_push_actions import EmailPushAction -from synapse.storage.state import StateFilter from synapse.types import StateMap, UserID +from synapse.types.state import StateFilter from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index edeba27a45..7ee07e4bee 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py
@@ -17,7 +17,6 @@ from synapse.events import EventBase from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore -from synapse.util.async_helpers import concurrently_execute async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int: @@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - badge = len(invites) - room_notifs = [] - - async def get_room_unread_count(room_id: str) -> None: - room_notifs.append( - await store.get_unread_event_push_actions_by_room_for_user( - room_id, - user_id, - ) - ) - - await concurrently_execute(get_room_unread_count, joins, 10) - - for notifs in room_notifs: - # Combine the counts from all the threads. - notify_count = notifs.main_timeline.notify_count + sum( - n.notify_count for n in notifs.threads.values() - ) + room_to_count = await store.get_unread_counts_by_room_for_user(user_id) + for room_id, notify_count in room_to_count.items(): + # room_to_count may include rooms which the user has left, + # ignore those. + if room_id not in joins: + continue if notify_count == 0: continue @@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - # return one badge count per conversation badge += 1 else: - # increment the badge count by the number of unread messages in the room + # Increase badge by number of notifications in room + # NOTE: this includes threaded and unthreaded notifications. badge += notify_count + return badge diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 18252a2958..b4dad47b45 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -36,12 +36,14 @@ from synapse.replication.tcp.streams import ( TagAccountDataStream, ToDeviceStream, TypingStream, + UnPartialStatedRoomStream, ) from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -117,6 +119,7 @@ class ReplicationDataHandler: self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() + self._state_storage_controller = hs.get_storage_controllers().state self._notify_pushers = hs.config.worker.start_pushers self._pusher_pool = hs.get_pusherpool() @@ -236,6 +239,14 @@ class ReplicationDataHandler: self.notifier.notify_user_joined_room( row.data.event_id, row.data.room_id ) + elif stream_name == UnPartialStatedRoomStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedRoomStreamRow) + + # Wake up any tasks waiting for the room to be un-partial-stated. + self._state_storage_controller.notify_room_un_partial_stated( + row.room_id + ) await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import ( ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream STREAMS_MAP = { stream.NAME: stream @@ -61,6 +62,7 @@ STREAMS_MAP = { TagAccountDataStream, AccountDataStream, UserSignatureStream, + UnPartialStatedRoomStream, ) } @@ -80,4 +82,5 @@ __all__ = [ "TagAccountDataStream", "AccountDataStream", "UserSignatureStream", + "UnPartialStatedRoomStream", ] diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py new file mode 100644
index 0000000000..18f087ffa2 --- /dev/null +++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +import attr + +from synapse.replication.tcp.streams import Stream +from synapse.replication.tcp.streams._base import current_token_without_instance + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnPartialStatedRoomStreamRow: + # ID of the room that has been un-partial-stated. + room_id: str + + +class UnPartialStatedRoomStream(Stream): + """ + Stream to notify about rooms becoming un-partial-stated; + that is, when the background sync finishes such that we now have full state for + the room. + """ + + NAME = "un_partial_stated_room" + ROW_TYPE = UnPartialStatedRoomStreamRow + + def __init__(self, hs: "HomeServer"): + store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + # TODO(faster_joins, multiple writers): we need to account for instance names + current_token_without_instance(store.get_un_partial_stated_rooms_token), + store.get_un_partial_stated_rooms_from_stream, + ) diff --git a/synapse/res/templates/_base.html b/synapse/res/templates/_base.html
index 46439fce6a..4b5cc7bcb6 100644 --- a/synapse/res/templates/_base.html +++ b/synapse/res/templates/_base.html
@@ -13,13 +13,13 @@ <body> <header class="mx_Header"> {% if app_name == "Riot" %} - <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> + <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> {% elif app_name == "Vector" %} - <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> + <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> {% elif app_name == "Element" %} <img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/> {% else %} - <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> + <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> {% endif %} </header> diff --git a/synapse/res/templates/notice_expiry.html b/synapse/res/templates/notice_expiry.html
index 406397aaca..f62038e111 100644 --- a/synapse/res/templates/notice_expiry.html +++ b/synapse/res/templates/notice_expiry.html
@@ -21,13 +21,13 @@ </td> <td class="logo"> {% if app_name == "Riot" %} - <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> + <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> {% elif app_name == "Vector" %} - <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> + <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> {% elif app_name == "Element" %} <img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/> {% else %} - <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> + <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> {% endif %} </td> </tr> diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html
index 2add9dd859..7da0fff5e9 100644 --- a/synapse/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html
@@ -22,13 +22,13 @@ </td> <td class="logo"> {%- if app_name == "Riot" %} - <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> + <img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> {%- elif app_name == "Vector" %} - <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> + <img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> {%- elif app_name == "Element" %} <img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/> {%- else %} - <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> + <img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/> {%- endif %} </td> </tr> diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 28542cd774..14c4e6ebbb 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py
@@ -29,7 +29,7 @@ from synapse.rest.client import ( initial_sync, keys, knock, - login as v1_login, + login, login_token_request, logout, mutual_rooms, @@ -82,6 +82,10 @@ class ClientRestResource(JsonResource): @staticmethod def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None: + # Some servlets are only registered on the main process (and not worker + # processes). + is_main_process = hs.config.worker.worker_app is None + versions.register_servlets(hs, client_resource) # Deprecated in r0 @@ -92,45 +96,58 @@ class ClientRestResource(JsonResource): events.register_servlets(hs, client_resource) room.register_servlets(hs, client_resource) - v1_login.register_servlets(hs, client_resource) + login.register_servlets(hs, client_resource) profile.register_servlets(hs, client_resource) presence.register_servlets(hs, client_resource) - directory.register_servlets(hs, client_resource) + if is_main_process: + directory.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource) - pusher.register_servlets(hs, client_resource) + if is_main_process: + pusher.register_servlets(hs, client_resource) push_rule.register_servlets(hs, client_resource) - logout.register_servlets(hs, client_resource) + if is_main_process: + logout.register_servlets(hs, client_resource) sync.register_servlets(hs, client_resource) - filter.register_servlets(hs, client_resource) + if is_main_process: + filter.register_servlets(hs, client_resource) account.register_servlets(hs, client_resource) register.register_servlets(hs, client_resource) - auth.register_servlets(hs, client_resource) + if is_main_process: + auth.register_servlets(hs, client_resource) receipts.register_servlets(hs, client_resource) read_marker.register_servlets(hs, client_resource) room_keys.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) - tokenrefresh.register_servlets(hs, client_resource) + if is_main_process: + tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) - report_event.register_servlets(hs, client_resource) - openid.register_servlets(hs, client_resource) - notifications.register_servlets(hs, client_resource) + if is_main_process: + report_event.register_servlets(hs, client_resource) + openid.register_servlets(hs, client_resource) + notifications.register_servlets(hs, client_resource) devices.register_servlets(hs, client_resource) - thirdparty.register_servlets(hs, client_resource) + if is_main_process: + thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) user_directory.register_servlets(hs, client_resource) - room_upgrade_rest_servlet.register_servlets(hs, client_resource) + if is_main_process: + room_upgrade_rest_servlet.register_servlets(hs, client_resource) room_batch.register_servlets(hs, client_resource) - capabilities.register_servlets(hs, client_resource) - account_validity.register_servlets(hs, client_resource) + if is_main_process: + capabilities.register_servlets(hs, client_resource) + account_validity.register_servlets(hs, client_resource) relations.register_servlets(hs, client_resource) - password_policy.register_servlets(hs, client_resource) - knock.register_servlets(hs, client_resource) + if is_main_process: + password_policy.register_servlets(hs, client_resource) + knock.register_servlets(hs, client_resource) # moving to /_synapse/admin - admin.register_servlets_for_client_rest_resource(hs, client_resource) + if is_main_process: + admin.register_servlets_for_client_rest_resource(hs, client_resource) # unstable - mutual_rooms.register_servlets(hs, client_resource) - login_token_request.register_servlets(hs, client_resource) - rendezvous.register_servlets(hs, client_resource) + if is_main_process: + mutual_rooms.register_servlets(hs, client_resource) + login_token_request.register_servlets(hs, client_resource) + rendezvous.register_servlets(hs, client_resource) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 747e6fda83..e957aa28ca 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py
@@ -34,9 +34,9 @@ from synapse.rest.admin._base import ( assert_user_is_admin, ) from synapse.storage.databases.main.room import RoomSortOrder -from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, UserID, create_requester +from synapse.types.state import StateFilter from synapse.util import json_decoder if TYPE_CHECKING: diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 44f622bcce..b4b92f0c99 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py
@@ -875,19 +875,21 @@ class AccountStatusRestServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - EmailPasswordRequestTokenRestServlet(hs).register(http_server) - PasswordRestServlet(hs).register(http_server) - DeactivateAccountRestServlet(hs).register(http_server) - EmailThreepidRequestTokenRestServlet(hs).register(http_server) - MsisdnThreepidRequestTokenRestServlet(hs).register(http_server) - AddThreepidEmailSubmitTokenServlet(hs).register(http_server) - AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + EmailPasswordRequestTokenRestServlet(hs).register(http_server) + PasswordRestServlet(hs).register(http_server) + DeactivateAccountRestServlet(hs).register(http_server) + EmailThreepidRequestTokenRestServlet(hs).register(http_server) + MsisdnThreepidRequestTokenRestServlet(hs).register(http_server) + AddThreepidEmailSubmitTokenServlet(hs).register(http_server) + AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server) ThreepidRestServlet(hs).register(http_server) - ThreepidAddRestServlet(hs).register(http_server) - ThreepidBindRestServlet(hs).register(http_server) - ThreepidUnbindRestServlet(hs).register(http_server) - ThreepidDeleteRestServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + ThreepidAddRestServlet(hs).register(http_server) + ThreepidBindRestServlet(hs).register(http_server) + ThreepidUnbindRestServlet(hs).register(http_server) + ThreepidDeleteRestServlet(hs).register(http_server) WhoamiRestServlet(hs).register(http_server) - if hs.config.experimental.msc3720_enabled: + if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled: AccountStatusRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 69b803f9f8..486c6dbbc5 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py
@@ -342,8 +342,10 @@ class ClaimDehydratedDeviceServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - DeleteDevicesRestServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + DeleteDevicesRestServlet(hs).register(http_server) DevicesRestServlet(hs).register(http_server) - DeviceRestServlet(hs).register(http_server) - DehydratedDeviceServlet(hs).register(http_server) - ClaimDehydratedDeviceServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + DeviceRestServlet(hs).register(http_server) + DehydratedDeviceServlet(hs).register(http_server) + ClaimDehydratedDeviceServlet(hs).register(http_server) diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index ee038c7192..7873b363c0 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py
@@ -376,5 +376,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: KeyQueryServlet(hs).register(http_server) KeyChangesServlet(hs).register(http_server) OneTimeKeyServlet(hs).register(http_server) - SigningKeyUploadServlet(hs).register(http_server) - SignaturesUploadServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + SigningKeyUploadServlet(hs).register(http_server) + SignaturesUploadServlet(hs).register(http_server) diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py
index 18a282b22c..28b7d30ea8 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py
@@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.types import JsonDict +from synapse.types import EventID, JsonDict, RoomID from ._base import client_patterns @@ -56,6 +56,9 @@ class ReceiptRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) + if not RoomID.is_valid(room_id) or not event_id.startswith(EventID.SIGIL): + raise SynapseError(400, "A valid room ID and event ID must be specified") + if receipt_type not in self._known_receipt_types: raise SynapseError( 400, diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index de810ae3ec..3cb1e7e375 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py
@@ -949,9 +949,10 @@ def _calculate_registration_flows( def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - EmailRegisterRequestTokenRestServlet(hs).register(http_server) - MsisdnRegisterRequestTokenRestServlet(hs).register(http_server) - UsernameAvailabilityRestServlet(hs).register(http_server) - RegistrationSubmitTokenServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: + EmailRegisterRequestTokenRestServlet(hs).register(http_server) + MsisdnRegisterRequestTokenRestServlet(hs).register(http_server) + UsernameAvailabilityRestServlet(hs).register(http_server) + RegistrationSubmitTokenServlet(hs).register(http_server) RegistrationTokenValidityRestServlet(hs).register(http_server) RegisterRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 91cb791139..790614d721 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py
@@ -55,9 +55,9 @@ from synapse.logging.opentracing import set_tag from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache -from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID +from synapse.types.state import StateFilter from synapse.util import json_decoder from synapse.util.cancellation import cancellable from synapse.util.stringutils import parse_and_validate_server_name, random_string @@ -396,12 +396,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) - try: - content = parse_json_object_from_request(request) - except Exception: - # Turns out we used to ignore the body entirely, and some clients - # cheekily send invalid bodies. - content = {} + content = parse_json_object_from_request(request, allow_empty_body=True) # twisted.web.server.Request.args is incorrectly defined as Optional[Any] args: Dict[bytes, List[bytes]] = request.args # type: ignore @@ -952,12 +947,7 @@ class RoomMembershipRestServlet(TransactionRestServlet): }: raise AuthError(403, "Guest access not allowed") - try: - content = parse_json_object_from_request(request) - except Exception: - # Turns out we used to ignore the body entirely, and some clients - # cheekily send invalid bodies. - content = {} + content = parse_json_object_from_request(request, allow_empty_body=True) if membership_action == "invite" and all( key in content for key in ("medium", "address") @@ -1284,17 +1274,14 @@ class TimestampLookupRestServlet(RestServlet): `dir` can be `f` or `b` to indicate forwards and backwards in time from the given timestamp. - GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction> + GET /_matrix/client/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction> { "event_id": ... } """ PATTERNS = ( - re.compile( - "^/_matrix/client/unstable/org.matrix.msc3030" - "/rooms/(?P<room_id>[^/]*)/timestamp_to_event$" - ), + re.compile("^/_matrix/client/v1/rooms/(?P<room_id>[^/]*)/timestamp_to_event$"), ) def __init__(self, hs: "HomeServer"): @@ -1398,9 +1385,7 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet): ) -def register_servlets( - hs: "HomeServer", http_server: HttpServer, is_worker: bool = False -) -> None: +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: RoomStateEventRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) JoinedRoomMemberListRestServlet(hs).register(http_server) @@ -1421,11 +1406,10 @@ def register_servlets( RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) - if hs.config.experimental.msc3030_enabled: - TimestampLookupRestServlet(hs).register(http_server) + TimestampLookupRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. - if not is_worker: + if hs.config.worker.worker_app is None: RoomForgetRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 46a8b03829..55d52f0b28 100644 --- a/synapse/rest/client/sendtodevice.py +++ b/synapse/rest/client/sendtodevice.py
@@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet): def on_PUT( self, request: SynapseRequest, message_type: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("message_type", message_type) set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id diff --git a/synapse/rest/client/user_directory.py b/synapse/rest/client/user_directory.py
index 116c982ce6..4670fad608 100644 --- a/synapse/rest/client/user_directory.py +++ b/synapse/rest/client/user_directory.py
@@ -63,8 +63,8 @@ class UserDirectorySearchRestServlet(RestServlet): body = parse_json_object_from_request(request) - limit = body.get("limit", 10) - limit = min(limit, 50) + limit = int(body.get("limit", 10)) + limit = max(min(limit, 50), 0) try: search_term = body["search_term"] diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 180a11ef88..e19c0946c0 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py
@@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet): "v1.2", "v1.3", "v1.4", + "v1.5", ], # as per MSC1497: "unstable_features": { @@ -101,8 +102,6 @@ class VersionsRestServlet(RestServlet): "org.matrix.msc3827.stable": True, # Adds support for importing historical messages as per MSC2716 "org.matrix.msc2716": self.config.experimental.msc2716_enabled, - # Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030 - "org.matrix.msc3030": self.config.experimental.msc3030_enabled, # Adds support for thread relations, per MSC3440. "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above # Support for thread read receipts & notification counts. diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 40b0d39eb2..c70e1837af 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py
@@ -24,7 +24,6 @@ from matrix_common.types.mxc_uri import MXCUri import twisted.internet.error import twisted.web.http from twisted.internet.defer import Deferred -from twisted.web.resource import Resource from synapse.api.errors import ( FederationDeniedError, @@ -35,6 +34,7 @@ from synapse.api.errors import ( ) from synapse.config._base import ConfigError from synapse.config.repository import ThumbnailRequirement +from synapse.http.server import UnrecognizedRequestResource from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread from synapse.metrics.background_process_metrics import run_as_background_process @@ -1046,7 +1046,7 @@ class MediaRepository: return removed_media, len(removed_media) -class MediaRepositoryResource(Resource): +class MediaRepositoryResource(UnrecognizedRequestResource): """File uploading and downloading. Uploads are POSTed to a resource which returns a token which is used to GET diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 833ffec3de..ee5469d5a8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py
@@ -44,8 +44,8 @@ from synapse.logging.context import ContextResourceUsage from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.state import StateFilter from synapse.types import StateMap +from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 2056ecb2c3..a99aea8926 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py
@@ -544,6 +544,48 @@ class BackgroundUpdater: The named index will be dropped upon completion of the new index. """ + async def updater(progress: JsonDict, batch_size: int) -> int: + await self.create_index_in_background( + index_name=index_name, + table=table, + columns=columns, + where_clause=where_clause, + unique=unique, + psql_only=psql_only, + replaces_index=replaces_index, + ) + await self._end_background_update(update_name) + return 1 + + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + updater, oneshot=True + ) + + async def create_index_in_background( + self, + index_name: str, + table: str, + columns: Iterable[str], + where_clause: Optional[str] = None, + unique: bool = False, + psql_only: bool = False, + replaces_index: Optional[str] = None, + ) -> None: + """Add an index in the background. + + Args: + update_name: update_name to register for + index_name: name of index to add + table: table to add index to + columns: columns/expressions to include in index + where_clause: A WHERE clause to specify a partial unique index. + unique: true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) + replaces_index: The name of an index that this index replaces. + The named index will be dropped upon completion of the new index. + """ + def create_index_psql(conn: Connection) -> None: conn.rollback() # postgres insists on autocommit for the index @@ -618,16 +660,11 @@ class BackgroundUpdater: else: runner = create_index_sqlite - async def updater(progress: JsonDict, batch_size: int) -> int: - if runner is not None: - logger.info("Adding index %s to %s", index_name, table) - await self.db_pool.runWithConnection(runner) - await self._end_background_update(update_name) - return 1 + if runner is None: + return - self._background_update_handlers[update_name] = _BackgroundUpdateHandler( - updater, oneshot=True - ) + logger.info("Adding index %s to %s", index_name, table) + await self.db_pool.runWithConnection(runner) async def _end_background_update(self, update_name: str) -> None: """Removes a completed background update task from the queue. diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 33ffef521b..f1d2c71c91 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -58,13 +58,13 @@ from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.state import StateFilter from synapse.types import ( PersistedEventPosition, RoomStreamToken, StateMap, get_domain_from_id, ) +from synapse.types.state import StateFilter from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results from synapse.util.metrics import Measure diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 2b31ce54bb..26d79c6e62 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py
@@ -31,12 +31,12 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.logging.opentracing import tag_args, trace from synapse.storage.roommember import ProfileInfo -from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, PartialStateEventsTracker, ) from synapse.types import MutableStateMap, StateMap +from synapse.types.state import StateFilter from synapse.util.cancellation import cancellable if TYPE_CHECKING: diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a14b13aec8..55bcb90001 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -1129,7 +1129,6 @@ class DatabasePool: values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, desc: str = "simple_upsert", - lock: bool = True, ) -> bool: """Insert a row with values + insertion_values; on conflict, update with values. @@ -1154,21 +1153,12 @@ class DatabasePool: requiring that a unique index exist on the column names used to detect a conflict (i.e. `keyvalues.keys()`). - If there is no such index, we can "emulate" an upsert with a SELECT followed - by either an INSERT or an UPDATE. This is unsafe: we cannot make the same - atomicity guarantees that a native upsert can and are very vulnerable to races - and crashes. Therefore if we wish to upsert without an appropriate unique index, - we must either: - - 1. Acquire a table-level lock before the emulated upsert (`lock=True`), or - 2. VERY CAREFULLY ensure that we are the only thread and worker which will be - writing to this table, in which case we can proceed without a lock - (`lock=False`). - - Generally speaking, you should use `lock=True`. If the table in question has a - unique index[*], this class will use a native upsert (which is atomic and so can - ignore the `lock` argument). Otherwise this class will use an emulated upsert, - in which case we want the safer option unless we been VERY CAREFUL. + If there is no such index yet[*], we can "emulate" an upsert with a SELECT + followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters + run at the SERIALIZABLE isolation level: we cannot make the same atomicity + guarantees that a native upsert can and are very vulnerable to races and + crashes. Therefore to upsert without an appropriate unique index, we acquire a + table-level lock before the emulated upsert. [*]: Some tables have unique indices added to them in the background. Those tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES, @@ -1189,7 +1179,6 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting desc: description of the transaction, for logging and metrics - lock: True to lock the table when doing the upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) @@ -1209,7 +1198,6 @@ class DatabasePool: keyvalues, values, insertion_values, - lock=lock, db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: @@ -1232,7 +1220,6 @@ class DatabasePool: values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, where_clause: Optional[str] = None, - lock: bool = True, ) -> bool: """ Pick the UPSERT method which works best on the platform. Either the @@ -1245,8 +1232,6 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting where_clause: An index predicate to apply to the upsert. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) @@ -1270,7 +1255,6 @@ class DatabasePool: values, insertion_values=insertion_values, where_clause=where_clause, - lock=lock, ) def simple_upsert_txn_emulated( @@ -1291,14 +1275,15 @@ class DatabasePool: insertion_values: additional key/values to use only when inserting where_clause: An index predicate to apply to the upsert. lock: True to lock the table when doing the upsert. + Must not be False unless the table has already been locked. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) """ insertion_values = insertion_values or {} - # We need to lock the table :(, unless we're *really* careful if lock: + # We need to lock the table :( self.engine.lock_table(txn, table) def _getwhere(key: str) -> str: @@ -1406,7 +1391,6 @@ class DatabasePool: value_names: Collection[str], value_values: Collection[Collection[Any]], desc: str, - lock: bool = True, ) -> None: """ Upsert, many times. @@ -1418,8 +1402,6 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. """ # We can autocommit if it safe to upsert @@ -1433,7 +1415,6 @@ class DatabasePool: key_values, value_names, value_values, - lock=lock, db_autocommit=autocommit, ) @@ -1445,7 +1426,6 @@ class DatabasePool: key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], - lock: bool = True, ) -> None: """ Upsert, many times. @@ -1457,8 +1437,6 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. """ if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( @@ -1466,7 +1444,12 @@ class DatabasePool: ) else: return self.simple_upsert_many_txn_emulated( - txn, table, key_names, key_values, value_names, value_values, lock=lock + txn, + table, + key_names, + key_values, + value_names, + value_values, ) def simple_upsert_many_txn_emulated( @@ -1477,7 +1460,6 @@ class DatabasePool: key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], - lock: bool = True, ) -> None: """ Upsert, many times, but without native UPSERT support or batching. @@ -1489,18 +1471,16 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. """ # No value columns, therefore make a blank list so that the following # zip() works correctly. if not value_names: value_values = [() for x in range(len(key_values))] - if lock: - # Lock the table just once, to prevent it being done once per row. - # Note that, according to Postgres' documentation, once obtained, - # the lock is held for the remainder of the current transaction. - self.engine.lock_table(txn, "user_ips") + # Lock the table just once, to prevent it being done once per row. + # Note that, according to Postgres' documentation, once obtained, + # the lock is held for the remainder of the current transaction. + self.engine.lock_table(txn, "user_ips") for keyv, valv in zip(key_values, value_values): _keys = {x: y for x, y in zip(key_names, keyv)} diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 282687ebce..07908c41d9 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -449,9 +449,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) content_json = json_encoder.encode(content) async with self._account_data_id_gen.get_next() as next_id: - # no need to lock here as room_account_data has a unique constraint - # on (user_id, room_id, account_data_type) so simple_upsert will - # retry if there is a conflict. await self.db_pool.simple_upsert( desc="add_room_account_data", table="room_account_data", @@ -461,7 +458,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "account_data_type": account_data_type, }, values={"stream_id": next_id, "content": content_json}, - lock=False, ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) @@ -517,15 +513,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) -> None: content_json = json_encoder.encode(content) - # no need to lock here as account_data has a unique constraint on - # (user_id, account_data_type) so simple_upsert will retry if - # there is a conflict. self.db_pool.simple_upsert_txn( txn, table="account_data", keyvalues={"user_id": user_id, "account_data_type": account_data_type}, values={"stream_id": next_id, "content": content_json}, - lock=False, ) # Ignored users get denormalized into a separate table as an optimisation. diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 63046c0527..c2c8018ee2 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -20,7 +20,7 @@ from synapse.appservice import ( ApplicationService, ApplicationServiceState, AppServiceTransaction, - TransactionOneTimeKeyCounts, + TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys, ) from synapse.config.appservice import load_appservices @@ -260,7 +260,7 @@ class ApplicationServiceTransactionWorkerStore( events: List[EventBase], ephemeral: List[JsonDict], to_device_messages: List[JsonDict], - one_time_key_counts: TransactionOneTimeKeyCounts, + one_time_keys_count: TransactionOneTimeKeysCount, unused_fallback_keys: TransactionUnusedFallbackKeys, device_list_summary: DeviceListUpdates, ) -> AppServiceTransaction: @@ -273,7 +273,7 @@ class ApplicationServiceTransactionWorkerStore( events: A list of persistent events to put in the transaction. ephemeral: A list of ephemeral events to put in the transaction. to_device_messages: A list of to-device messages to put in the transaction. - one_time_key_counts: Counts of remaining one-time keys for relevant + one_time_keys_count: Counts of remaining one-time keys for relevant appservice devices in the transaction. unused_fallback_keys: Lists of unused fallback keys for relevant appservice devices in the transaction. @@ -299,7 +299,7 @@ class ApplicationServiceTransactionWorkerStore( events=events, ephemeral=ephemeral, to_device_messages=to_device_messages, - one_time_key_counts=one_time_key_counts, + one_time_keys_count=one_time_keys_count, unused_fallback_keys=unused_fallback_keys, device_list_summary=device_list_summary, ) @@ -379,7 +379,7 @@ class ApplicationServiceTransactionWorkerStore( events=events, ephemeral=[], to_device_messages=[], - one_time_key_counts={}, + one_time_keys_count={}, unused_fallback_keys={}, device_list_summary=DeviceListUpdates(), ) @@ -451,8 +451,6 @@ class ApplicationServiceTransactionWorkerStore( table="application_services_state", keyvalues={"as_id": service.id}, values={f"{stream_type}_stream_id": pos}, - # no need to lock when emulating upsert: as_id is a unique key - lock=False, desc="set_appservice_stream_type_pos", ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..48a54d9cb8 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -26,8 +26,15 @@ from typing import ( cast, ) +from synapse.api.constants import EventContentFields from synapse.logging import issue9533_logger -from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + trace, +) from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore): (recipient_user_id, recipient_device_id), [] ).append(message_dict) + # start a new span for each message, so that we can tag each separately + with start_active_span("get_to_device_message"): + set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID), + ) + if limit is not None and rowcount == limit: # We ended up bumping up against the message limit. There may be more messages # to retrieve. Return what we have, as well as the last stream position that @@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore): ], ) - if remote_messages_by_destination: - issue9533_logger.debug( - "Queued outgoing to-device messages with stream_id %i for %s", - stream_id, - list(remote_messages_by_destination.keys()), - ) + for destination, edu in remote_messages_by_destination.items(): + if issue9533_logger.isEnabledFor(logging.DEBUG): + issue9533_logger.debug( + "Queued outgoing to-device messages with " + "stream_id %i, EDU message_id %s, type %s for %s: %s", + stream_id, + edu["message_id"], + edu["type"], + destination, + [ + f"{user_id}/{device_id} (msgid " + f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})" + for (user_id, messages_by_device) in edu["messages"].items() + for (device_id, msg) in messages_by_device.items() + ], + ) + + for (user_id, messages_by_device) in edu["messages"].items(): + for (device_id, msg) in messages_by_device.items(): + with start_active_span("store_outgoing_to_device_message"): + set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"]) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"]) + set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + msg.get(EventContentFields.TO_DEVICE_MSGID), + ) async with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self._clock.time_msec() @@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Only insert into the local inbox if the device exists on # this server device_id = row["device_id"] - message_json = json_encoder.encode(messages_by_device[device_id]) + + with start_active_span("serialise_to_device_message"): + msg = messages_by_device[device_id] + set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + msg["content"].get(EventContentFields.TO_DEVICE_MSGID), + ) + message_json = json_encoder.encode(msg) + messages_json_for_user[device_id] = message_json if messages_json_for_user: @@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): ], ) - issue9533_logger.debug( - "Stored to-device messages with stream_id %i for %s", - stream_id, - [ - (user_id, device_id) - for (user_id, messages_by_device) in local_by_user_then_device.items() - for device_id in messages_by_device.keys() - ], - ) + if issue9533_logger.isEnabledFor(logging.DEBUG): + issue9533_logger.debug( + "Stored to-device messages with stream_id %i: %s", + stream_id, + [ + f"{user_id}/{device_id} (msgid " + f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})" + for ( + user_id, + messages_by_device, + ) in messages_by_user_then_device.items() + for (device_id, msg) in messages_by_device.items() + ], + ) class DeviceInboxBackgroundUpdateStore(SQLBaseStore): diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 05a193f889..a5bb4d404e 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.caches.stream_change_cache import ( + AllEntitiesChangedResult, + StreamChangeCache, +) from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -799,7 +802,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_cached_device_list_changes( self, from_key: int, - ) -> Optional[List[str]]: + ) -> AllEntitiesChangedResult: """Get set of users whose devices have changed since `from_key`, or None if that information is not in our cache. """ @@ -807,10 +810,58 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return self._device_list_stream_cache.get_all_entities_changed(from_key) @cancellable + async def get_all_devices_changed( + self, + from_key: int, + to_key: int, + ) -> Set[str]: + """Get all users whose devices have changed in the given range. + + Args: + from_key: The minimum device lists stream token to query device list + changes for, exclusive. + to_key: The maximum device lists stream token to query device list + changes for, inclusive. + + Returns: + The set of user_ids whose devices have changed since `from_key` + (exclusive) until `to_key` (inclusive). + """ + + result = self._device_list_stream_cache.get_all_entities_changed(from_key) + + if result.hit: + # We know which users might have changed devices. + if not result.entities: + # If no users then we can return early. + return set() + + # Otherwise we need to filter down the list + return await self.get_users_whose_devices_changed( + from_key, result.entities, to_key + ) + + # If the cache didn't tell us anything, we just need to query the full + # range. + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream + WHERE ? < stream_id AND stream_id <= ? + """ + + rows = await self.db_pool.execute( + "get_all_devices_changed", + None, + sql, + from_key, + to_key, + ) + return {u for u, in rows} + + @cancellable async def get_users_whose_devices_changed( self, from_key: int, - user_ids: Optional[Collection[str]] = None, + user_ids: Collection[str], to_key: Optional[int] = None, ) -> Set[str]: """Get set of users whose devices have changed since `from_key` that @@ -830,46 +881,31 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): """ # Get set of users who *may* have changed. Users not in the returned # list have definitely not changed. - user_ids_to_check: Optional[Collection[str]] - if user_ids is None: - # Get set of all users that have had device list changes since 'from_key' - user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed( - from_key - ) - else: - # The same as above, but filter results to only those users in 'user_ids' - user_ids_to_check = self._device_list_stream_cache.get_entities_changed( - user_ids, from_key - ) + user_ids_to_check = self._device_list_stream_cache.get_entities_changed( + user_ids, from_key + ) + # If an empty set was returned, there's nothing to do. if not user_ids_to_check: return set() - def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: - changes: Set[str] = set() - - stream_id_where_clause = "stream_id > ?" - sql_args = [from_key] - - if to_key: - stream_id_where_clause += " AND stream_id <= ?" - sql_args.append(to_key) + if to_key is None: + to_key = self._device_list_id_gen.get_current_token() - sql = f""" + def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: + sql = """ SELECT DISTINCT user_id FROM device_lists_stream - WHERE {stream_id_where_clause} - AND + WHERE ? < stream_id AND stream_id <= ? AND %s """ + changes: Set[str] = set() + # Query device changes with a batch of users at a time - # Assertion for mypy's benefit; see also - # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions - assert user_ids_to_check is not None for chunk in batch_iter(user_ids_to_check, 100): clause, args = make_in_list_sql_clause( txn.database_engine, "user_id", chunk ) - txn.execute(sql + clause, sql_args + args) + txn.execute(sql % (clause,), [from_key, to_key] + args) changes.update(user_id for user_id, in txn) return changes @@ -1744,9 +1780,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, values={"content": json_encoder.encode(content)}, - # we don't need to lock, because we assume we are the only thread - # updating this user's devices. - lock=False, ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id)) @@ -1760,9 +1793,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, values={"stream_id": stream_id}, - # again, we can assume we are the only thread updating this user's - # extremity. - lock=False, ) async def update_remote_device_list_cache( @@ -1815,9 +1845,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, values={"stream_id": stream_id}, - # we don't need to lock, because we can assume we are the only thread - # updating this user's extremity. - lock=False, ) async def add_device_change_to_streams( diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index cf33e73e2b..4c691642e2 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -33,7 +33,7 @@ from typing_extensions import Literal from synapse.api.constants import DeviceKeyAlgorithms from synapse.appservice import ( - TransactionOneTimeKeyCounts, + TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys, ) from synapse.logging.opentracing import log_kv, set_tag, trace @@ -140,7 +140,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker @cancellable async def get_e2e_device_keys_for_cs_api( self, - query_list: List[Tuple[str, Optional[str]]], + query_list: Collection[Tuple[str, Optional[str]]], include_displaynames: bool = True, ) -> Dict[str, Dict[str, JsonDict]]: """Fetch a list of device keys, formatted suitably for the C/S API. @@ -514,7 +514,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker async def count_bulk_e2e_one_time_keys_for_as( self, user_ids: Collection[str] - ) -> TransactionOneTimeKeyCounts: + ) -> TransactionOneTimeKeysCount: """ Counts, in bulk, the one-time keys for all the users specified. Intended to be used by application services for populating OTK counts in @@ -528,7 +528,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker def _count_bulk_e2e_one_time_keys_txn( txn: LoggingTransaction, - ) -> TransactionOneTimeKeyCounts: + ) -> TransactionOneTimeKeysCount: user_in_where_clause, user_parameters = make_in_list_sql_clause( self.database_engine, "user_id", user_ids ) @@ -541,7 +541,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker """ txn.execute(sql, user_parameters) - result: TransactionOneTimeKeyCounts = {} + result: TransactionOneTimeKeysCount = {} for user_id, device_id, algorithm, count in txn: # We deliberately construct empty dictionaries for diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 309a4ba664..bbee02ab18 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1686,7 +1686,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas }, insertion_values={}, desc="insert_insertion_extremity", - lock=False, ) async def insert_received_event_to_staging( diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b283ab0f9c..7ebe34f773 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,6 +74,7 @@ receipt. """ import logging +from collections import defaultdict from typing import ( TYPE_CHECKING, Collection, @@ -95,6 +96,7 @@ from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas return result + async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: + """Get the notification count by room for a user. Only considers notifications, + not highlight or unread counts, and threads are currently aggregated under their room. + + This function is intentionally not cached because it is called to calculate the + unread badge for push notifications and thus the result is expected to change. + + Note that this function assumes the user is a member of the room. Because + summary rows are not removed when a user leaves a room, the caller must + filter out those results from the result. + + Returns: + A map of room ID to notification counts for the given user. + """ + return await self.db_pool.runInteraction( + "get_unread_counts_by_room_for_user", + self._get_unread_counts_by_room_for_user_txn, + user_id, + ) + + def _get_unread_counts_by_room_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> Dict[str, int]: + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + args.extend([user_id, user_id]) + + receipts_cte = f""" + WITH all_receipts AS ( + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + ) + """ + + receipts_joins = """ + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS threaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NOT NULL + ) AS threaded_receipts USING (room_id, thread_id) + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NULL + ) AS unthreaded_receipts USING (room_id) + """ + + # First get summary counts by room / thread for the user. We use the max receipt + # stream ordering of both threaded & unthreaded receipts to compare against the + # summary table. + # + # PostgreSQL and SQLite differ in comparing scalar numerics. + if isinstance(self.database_engine, PostgresEngine): + # GREATEST ignores NULLs. + max_clause = """GREATEST( + threaded_receipt_stream_ordering, + unthreaded_receipt_stream_ordering + )""" + else: + # MAX returns NULL if any are NULL, so COALESCE to 0 first. + max_clause = """MAX( + COALESCE(threaded_receipt_stream_ordering, 0), + COALESCE(unthreaded_receipt_stream_ordering, 0) + )""" + + sql = f""" + {receipts_cte} + SELECT eps.room_id, eps.thread_id, notif_count + FROM event_push_summary AS eps + {receipts_joins} + WHERE user_id = ? + AND notif_count != 0 + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) + OR last_receipt_stream_ordering = {max_clause} + ) + """ + txn.execute(sql, args) + + seen_thread_ids = set() + room_to_count: Dict[str, int] = defaultdict(int) + + for room_id, thread_id, notif_count in txn: + room_to_count[room_id] += notif_count + seen_thread_ids.add(thread_id) + + # Now get any event push actions that haven't been rotated using the same OR + # join and filter by receipt and event push summary rotated up to stream ordering. + sql = f""" + {receipts_cte} + SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + {receipts_joins} + WHERE user_id = ? + AND epa.notif = 1 + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + GROUP BY epa.room_id, epa.thread_id + """ + txn.execute(sql, args) + + for room_id, thread_id, notif_count in txn: + # Note: only count push actions we have valid summaries for with up to date receipt. + if thread_id not in seen_thread_ids: + continue + room_to_count[room_id] += notif_count + + thread_id_clause, thread_ids_args = make_in_list_sql_clause( + self.database_engine, "epa.thread_id", seen_thread_ids + ) + + # Finally re-check event_push_actions for any rooms not in the summary, ignoring + # the rotated up-to position. This handles the case where a read receipt has arrived + # but not been rotated meaning the summary table is out of date, so we go back to + # the push actions table. + sql = f""" + {receipts_cte} + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + {receipts_joins} + WHERE user_id = ? + AND NOT {thread_id_clause} + AND epa.notif = 1 + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + GROUP BY epa.room_id + """ + + args.extend(thread_ids_args) + txn.execute(sql, args) + + for room_id, notif_count in txn: + room_to_count[room_id] += notif_count + + return room_to_count + @cached(tree=True, max_entries=5000, iterable=True) async def get_unread_event_push_actions_by_room_for_user( self, diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 12ad44dbb3..d4c64c46ad 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -84,7 +84,10 @@ def _load_rules( push_rules = PushRules(ruleslist) filtered_rules = FilteredPushRules( - push_rules, enabled_map, msc3664_enabled=experimental_config.msc3664_enabled + push_rules, + enabled_map, + msc3664_enabled=experimental_config.msc3664_enabled, + msc1767_enabled=experimental_config.msc1767_enabled, ) return filtered_rules diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index fee37b9ce4..40fd781a6a 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -325,14 +325,11 @@ class PusherWorkerStore(SQLBaseStore): async def set_throttle_params( self, pusher_id: str, room_id: str, params: ThrottleParams ) -> None: - # no need to lock because `pusher_throttle` has a primary key on - # (pusher, room_id) so simple_upsert will retry await self.db_pool.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, {"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms}, desc="set_throttle_params", - lock=False, ) async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int: @@ -589,8 +586,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): device_id: Optional[str] = None, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: - # no need to lock because `pushers` has a unique key on - # (app_id, pushkey, user_name) so simple_upsert will retry await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, @@ -609,7 +604,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): "device_id": device_id, }, desc="add_pusher", - lock=False, ) user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index a580e4bdda..e06725f69c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -924,39 +924,6 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): return batch_size - async def _create_receipts_index(self, index_name: str, table: str) -> None: - """Adds a unique index on `(room_id, receipt_type, user_id)` to the given - receipts table, for non-thread receipts.""" - - def _create_index(conn: LoggingDatabaseConnection) -> None: - conn.rollback() - - # we have to set autocommit, because postgres refuses to - # CREATE INDEX CONCURRENTLY without it. - if isinstance(self.database_engine, PostgresEngine): - conn.set_session(autocommit=True) - - try: - c = conn.cursor() - - # Now that the duplicates are gone, we can create the index. - concurrently = ( - "CONCURRENTLY" - if isinstance(self.database_engine, PostgresEngine) - else "" - ) - sql = f""" - CREATE UNIQUE INDEX {concurrently} {index_name} - ON {table}(room_id, receipt_type, user_id) - WHERE thread_id IS NULL - """ - c.execute(sql) - finally: - if isinstance(self.database_engine, PostgresEngine): - conn.set_session(autocommit=False) - - await self.db_pool.runWithConnection(_create_index) - async def _background_receipts_linearized_unique_index( self, progress: dict, batch_size: int ) -> int: @@ -999,9 +966,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): _remote_duplicate_receipts_txn, ) - await self._create_receipts_index( - "receipts_linearized_unique_index", - "receipts_linearized", + await self.db_pool.updates.create_index_in_background( + index_name="receipts_linearized_unique_index", + table="receipts_linearized", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, ) await self.db_pool.updates._end_background_update( @@ -1050,9 +1020,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): _remote_duplicate_receipts_txn, ) - await self._create_receipts_index( - "receipts_graph_unique_index", - "receipts_graph", + await self.db_pool.updates.create_index_in_background( + index_name="receipts_graph_unique_index", + table="receipts_graph", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, ) await self.db_pool.updates._end_background_update( diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 52ad947c6c..78906a5e1d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@ # Copyright 2014-2016 OpenMarket Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -50,8 +50,14 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.storage.util.id_generators import IdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + IdGenerator, + MultiWriterIdGenerator, + StreamIdGenerator, +) from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): self.config: HomeServerConfig = hs.config + self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator + + if isinstance(database.engine, PostgresEngine): + self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="un_partial_stated_room_stream", + instance_name=self._instance_name, + tables=[ + ("un_partial_stated_room_stream", "instance_name", "stream_id") + ], + sequence_name="un_partial_stated_room_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) + else: + self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator( + db_conn, "un_partial_stated_room_stream", "stream_id" + ) + async def store_room( self, room_id: str, @@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return room_servers - async def clear_partial_state_room(self, room_id: str) -> bool: - """Clears the partial state flag for a room. - - Args: - room_id: The room whose partial state flag is to be cleared. - - Returns: - `True` if the partial state flag has been cleared successfully. - - `False` if the partial state flag could not be cleared because the room - still contains events with partial state. - """ - try: - await self.db_pool.runInteraction( - "clear_partial_state_room", self._clear_partial_state_room_txn, room_id - ) - return True - except self.db_pool.engine.module.IntegrityError as e: - # Assume that any `IntegrityError`s are due to partial state events. - logger.info( - "Exception while clearing lazy partial-state-room %s, retrying: %s", - room_id, - e, - ) - return False - - def _clear_partial_state_room_txn( - self, txn: LoggingTransaction, room_id: str - ) -> None: - DatabasePool.simple_delete_txn( - txn, - table="partial_state_rooms_servers", - keyvalues={"room_id": room_id}, - ) - DatabasePool.simple_delete_one_txn( - txn, - table="partial_state_rooms", - keyvalues={"room_id": room_id}, - ) - self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) - self._invalidate_cache_and_stream( - txn, self.get_partial_state_servers_at_join, (room_id,) - ) - - # We now delete anything from `device_lists_remote_pending` with a - # stream ID less than the minimum - # `partial_state_rooms.device_lists_stream_id`, as we no longer need them. - device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn( - txn, - table="partial_state_rooms", - keyvalues={}, - retcol="MIN(device_lists_stream_id)", - allow_none=True, - ) - if device_lists_stream_id is None: - # There are no rooms being currently partially joined, so we delete everything. - txn.execute("DELETE FROM device_lists_remote_pending") - else: - sql = """ - DELETE FROM device_lists_remote_pending - WHERE stream_id <= ? - """ - txn.execute(sql, (device_lists_stream_id,)) - @cached() async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. @@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): ) return result["join_event_id"], result["device_lists_stream_id"] + def get_un_partial_stated_rooms_token(self) -> int: + # TODO(faster_joins, multiple writers): This is inappropriate if there + # are multiple writers because workers that don't write often will + # hold all readers up. + # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an + # explanation.) + return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + + async def get_un_partial_stated_rooms_from_stream( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: + """Get updates for caches replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_un_partial_stated_rooms_from_stream_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: + sql = """ + SELECT stream_id, room_id + FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, instance_name, limit)) + updates = [(row[0], (row[1],)) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_from_stream", + get_un_partial_stated_rooms_from_stream_txn, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") + self._instance_name = hs.get_instance_name() + async def upsert_room_on_join( self, room_id: str, room_version: RoomVersion, state_events: List[EventBase] ) -> None: @@ -1847,9 +1871,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): "creator": room_creator, "has_auth_chain_index": has_auth_chain_index, }, - # rooms has a unique constraint on room_id, so no need to lock when doing an - # emulated upsert. - lock=False, ) async def store_partial_state_room( @@ -1970,9 +1991,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): "creator": "", "has_auth_chain_index": has_auth_chain_index, }, - # rooms has a unique constraint on room_id, so no need to lock when doing an - # emulated upsert. - lock=False, ) async def set_room_is_public(self, room_id: str, is_public: bool) -> None: @@ -2276,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): self.is_room_blocked, (room_id,), ) + + async def clear_partial_state_room(self, room_id: str) -> bool: + """Clears the partial state flag for a room. + + Args: + room_id: The room whose partial state flag is to be cleared. + + Returns: + `True` if the partial state flag has been cleared successfully. + + `False` if the partial state flag could not be cleared because the room + still contains events with partial state. + """ + try: + async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id: + await self.db_pool.runInteraction( + "clear_partial_state_room", + self._clear_partial_state_room_txn, + room_id, + un_partial_state_room_stream_id, + ) + return True + except self.db_pool.engine.module.IntegrityError as e: + # Assume that any `IntegrityError`s are due to partial state events. + logger.info( + "Exception while clearing lazy partial-state-room %s, retrying: %s", + room_id, + e, + ) + return False + + def _clear_partial_state_room_txn( + self, + txn: LoggingTransaction, + room_id: str, + un_partial_state_room_stream_id: int, + ) -> None: + DatabasePool.simple_delete_txn( + txn, + table="partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + ) + DatabasePool.simple_delete_one_txn( + txn, + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_partial_state_servers_at_join, (room_id,) + ) + + DatabasePool.simple_insert_txn( + txn, + "un_partial_stated_room_stream", + { + "stream_id": un_partial_state_room_stream_id, + "instance_name": self._instance_name, + "room_id": room_id, + }, + ) + + # We now delete anything from `device_lists_remote_pending` with a + # stream ID less than the minimum + # `partial_state_rooms.device_lists_stream_id`, as we no longer need them. + device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn( + txn, + table="partial_state_rooms", + keyvalues={}, + retcol="MIN(device_lists_stream_id)", + allow_none=True, + ) + if device_lists_stream_id is None: + # There are no rooms being currently partially joined, so we delete everything. + txn.execute("DELETE FROM device_lists_remote_pending") + else: + sql = """ + DELETE FROM device_lists_remote_pending + WHERE stream_id <= ? + """ + txn.execute(sql, (device_lists_stream_id,)) diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 39e80f6f5b..131f357d04 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py
@@ -44,6 +44,4 @@ class RoomBatchStore(SQLBaseStore): table="event_to_state_groups", keyvalues={"event_id": event_id}, values={"state_group": state_group_id, "event_id": event_id}, - # Unique constraint on event_id so we don't have to lock - lock=False, ) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index af7bebee80..c801a93b5b 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -33,8 +33,8 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.storage.state import StateFilter from synapse.types import JsonDict, JsonMapping, StateMap +from synapse.types.state import StateFilter from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList from synapse.util.cancellation import cancellable diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 698d6f7515..14ef5b040d 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -26,6 +26,14 @@ from typing import ( cast, ) +try: + # Figure out if ICU support is available for searching users. + import icu + + USE_ICU = True +except ModuleNotFoundError: + USE_ICU = False + from typing_extensions import TypedDict from synapse.api.errors import StoreError @@ -481,7 +489,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): table="user_directory", keyvalues={"user_id": user_id}, values={"display_name": display_name, "avatar_url": avatar_url}, - lock=False, # We're only inserter ) if isinstance(self.database_engine, PostgresEngine): @@ -511,7 +518,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): table="user_directory_search", keyvalues={"user_id": user_id}, values={"value": value}, - lock=False, # We're only inserter ) else: # This should be unreachable. @@ -888,7 +894,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): limited = len(results) > limit - return {"limited": limited, "results": results} + return {"limited": limited, "results": results[0:limit]} def _parse_query_sqlite(search_term: str) -> str: @@ -902,7 +908,7 @@ def _parse_query_sqlite(search_term: str) -> str: """ # Pull out the individual words, discarding any non-word characters. - results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + results = _parse_words(search_term) return " & ".join("(%s* OR %s)" % (result, result) for result in results) @@ -912,12 +918,63 @@ def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]: We use this so that we can add prefix matching, which isn't something that is supported by default. """ - - # Pull out the individual words, discarding any non-word characters. - results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + results = _parse_words(search_term) both = " & ".join("(%s:* | %s)" % (result, result) for result in results) exact = " & ".join("%s" % (result,) for result in results) prefix = " & ".join("%s:*" % (result,) for result in results) return both, exact, prefix + + +def _parse_words(search_term: str) -> List[str]: + """Split the provided search string into a list of its words. + + If support for ICU (International Components for Unicode) is available, use it. + Otherwise, fall back to using a regex to detect word boundaries. This latter + solution works well enough for most latin-based languages, but doesn't work as well + with other languages. + + Args: + search_term: The search string. + + Returns: + A list of the words in the search string. + """ + if USE_ICU: + return _parse_words_with_icu(search_term) + + return re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + +def _parse_words_with_icu(search_term: str) -> List[str]: + """Break down the provided search string into its individual words using ICU + (International Components for Unicode). + + Args: + search_term: The search string. + + Returns: + A list of the words in the search string. + """ + results = [] + breaker = icu.BreakIterator.createWordInstance(icu.Locale.getDefault()) + breaker.setText(search_term) + i = 0 + while True: + j = breaker.nextBoundary() + if j < 0: + break + + result = search_term[i:j] + + # libicu considers spaces and punctuation between words as words, but we don't + # want to include those in results as they would result in syntax errors in SQL + # queries (e.g. "foo bar" would result in the search query including "foo & & + # bar"). + if len(re.findall(r"([\w\-]+)", result, re.UNICODE)): + results.append(result) + + i = j + + return results diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 4a4ad0f492..d743282f13 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -22,8 +22,8 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.engines import PostgresEngine -from synapse.storage.state import StateFilter from synapse.types import MutableStateMap, StateMap +from synapse.types.state import StateFilter from synapse.util.caches import intern_string if TYPE_CHECKING: diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index f8cfcaca83..1a7232b276 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -25,10 +25,10 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore -from synapse.storage.state import StateFilter from synapse.storage.types import Cursor from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateKey, StateMap +from synapse.types.state import StateFilter from synapse.util.caches.descriptors import cached from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.cancellation import cancellable diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql new file mode 100644
index 0000000000..743196cfe3 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Stream for notifying that a room has become un-partial-stated. +CREATE TABLE un_partial_stated_room_stream( + -- Position in the stream + stream_id BIGINT PRIMARY KEY NOT NULL, + + -- Which instance wrote this entry. + instance_name TEXT NOT NULL, + + -- Which room has been un-partial-stated. + room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE +); + +-- We want an index here because of the foreign key constraint: +-- upon deleting a room, the database needs to be able to check here. +-- This index is not unique because we can join a room multiple times in a server's lifetime, +-- so the same room could be un-partial-stated multiple times! +CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id); diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres new file mode 100644
index 0000000000..c1aac0b385 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence; + +SELECT setval('un_partial_stated_room_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream +)); diff --git a/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql b/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql new file mode 100644
index 0000000000..afab1e4bb7 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/22_rebuild_user_dir_stats.sql
@@ -0,0 +1,29 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + -- Set up user directory staging tables. + (7322, 'populate_user_directory_createtables', '{}', NULL), + -- Run through each room and update the user directory according to who is in it. + (7322, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'), + -- Insert all users into the user directory, if search_all_users is on. + (7322, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'), + -- Clean up user directory staging tables. + (7322, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users'), + -- Rebuild the room_stats_current and room_stats_state tables. + (7322, 'populate_stats_process_rooms', '{}', NULL), + -- Update the user_stats_current table. + (7322, 'populate_stats_process_users', '{}', NULL) +ON CONFLICT (update_name) DO NOTHING; diff --git a/synapse/types.py b/synapse/types/__init__.py
index f2d436ddc3..f2d436ddc3 100644 --- a/synapse/types.py +++ b/synapse/types/__init__.py
diff --git a/synapse/storage/state.py b/synapse/types/state.py
index 0004d955b4..0004d955b4 100644 --- a/synapse/storage/state.py +++ b/synapse/types/state.py
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 666f4b6895..1657459549 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py
@@ -16,6 +16,7 @@ import logging import math from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union +import attr from sortedcontainers import SortedDict from synapse.util import caches @@ -26,14 +27,41 @@ logger = logging.getLogger(__name__) EntityType = str +@attr.s(auto_attribs=True, frozen=True, slots=True) +class AllEntitiesChangedResult: + """Return type of `get_all_entities_changed`. + + Callers must check that there was a cache hit, via `result.hit`, before + using the entities in `result.entities`. + + This specifically does *not* implement helpers such as `__bool__` to ensure + that callers do the correct checks. + """ + + _entities: Optional[List[EntityType]] + + @property + def hit(self) -> bool: + return self._entities is not None + + @property + def entities(self) -> List[EntityType]: + assert self._entities is not None + return self._entities + + class StreamChangeCache: - """Keeps track of the stream positions of the latest change in a set of entities. + """ + Keeps track of the stream positions of the latest change in a set of entities. + + The entity will is typically a room ID or user ID, but can be any string. - Typically the entity will be a room or user id. + Can be queried for whether a specific entity has changed after a stream position + or for a list of changed entities after a stream position. See the individual + methods for more information. - Given a list of entities and a stream position, it will give a subset of - entities that may have changed since that position. If position key is too - old then the cache will simply return all given entities. + Only tracks to a maximum cache size, any position earlier than the earliest + known stream position must be treated as unknown. """ def __init__( @@ -45,16 +73,20 @@ class StreamChangeCache: ) -> None: self._original_max_size: int = max_size self._max_size = math.floor(max_size) - self._entity_to_key: Dict[EntityType, int] = {} - # map from stream id to the a set of entities which changed at that stream id. + # map from stream id to the set of entities which changed at that stream id. self._cache: SortedDict[int, Set[EntityType]] = SortedDict() + # map from entity to the stream ID of the latest change for that entity. + # + # Must be kept in sync with _cache. + self._entity_to_key: Dict[EntityType, int] = {} # the earliest stream_pos for which we can reliably answer # get_all_entities_changed. In other words, one less than the earliest # stream_pos for which we know _cache is valid. # self._earliest_known_stream_pos = current_stream_pos + self.name = name self.metrics = caches.register_cache( "cache", self.name, self._cache, resize_callback=self.set_cache_factor @@ -82,22 +114,46 @@ class StreamChangeCache: return False def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: - """Returns True if the entity may have been updated since stream_pos""" + """ + Returns True if the entity may have been updated after stream_pos. + + Args: + entity: The entity to check for changes. + stream_pos: The stream position to check for changes after. + + Return: + True if the entity may have been updated, this happens if: + * The given stream position is at or earlier than the earliest + known stream position. + * The given stream position is earlier than the latest change for + the entity. + + False otherwise: + * The entity is unknown. + * The given stream position is at or later than the latest change + for the entity. + """ assert isinstance(stream_pos, int) - if stream_pos < self._earliest_known_stream_pos: + # _cache is not valid at or before the earliest known stream position, so + # return that the entity has changed. + if stream_pos <= self._earliest_known_stream_pos: self.metrics.inc_misses() return True + # If the entity is unknown, it hasn't changed. latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: self.metrics.inc_hits() return False + # This is a known entity, return true if the stream position is earlier + # than the last change. if stream_pos < latest_entity_change_pos: self.metrics.inc_misses() return True + # Otherwise, the stream position is after the latest change: return false. self.metrics.inc_hits() return False @@ -105,23 +161,35 @@ class StreamChangeCache: self, entities: Collection[EntityType], stream_pos: int ) -> Union[Set[EntityType], FrozenSet[EntityType]]: """ - Returns subset of entities that have had new things since the given - position. Entities unknown to the cache will be returned. If the - position is too old it will just return the given list. + Returns the subset of the given entities that have had changes after the given position. + + Entities unknown to the cache will be returned. + + If the position is too old it will just return the given list. + + Args: + entities: Entities to check for changes. + stream_pos: The stream position to check for changes after. + + Return: + A subset of entities which have changed after the given stream position. + + This will be all entities if the given stream position is at or earlier + than the earliest known stream position. """ - changed_entities = self.get_all_entities_changed(stream_pos) - if changed_entities is not None: + cache_result = self.get_all_entities_changed(stream_pos) + if cache_result.hit: # We now do an intersection, trying to do so in the most efficient # way possible (some of these sets are *large*). First check in the - # given iterable is already set that we can reuse, otherwise we + # given iterable is already a set that we can reuse, otherwise we # create a set of the *smallest* of the two iterables and call # `intersection(..)` on it (this can be twice as fast as the reverse). if isinstance(entities, (set, frozenset)): - result = entities.intersection(changed_entities) - elif len(changed_entities) < len(entities): - result = set(changed_entities).intersection(entities) + result = entities.intersection(cache_result.entities) + elif len(cache_result.entities) < len(entities): + result = set(cache_result.entities).intersection(entities) else: - result = set(entities).intersection(changed_entities) + result = set(entities).intersection(cache_result.entities) self.metrics.inc_hits() else: result = set(entities) @@ -130,43 +198,76 @@ class StreamChangeCache: return result def has_any_entity_changed(self, stream_pos: int) -> bool: - """Returns if any entity has changed""" - assert type(stream_pos) is int + """ + Returns true if any entity has changed after the given stream position. - if not self._cache: - # If the cache is empty, nothing can have changed. - return False + Args: + stream_pos: The stream position to check for changes after. - if stream_pos >= self._earliest_known_stream_pos: - self.metrics.inc_hits() - return self._cache.bisect_right(stream_pos) < len(self._cache) - else: + Return: + True if any entity has changed after the given stream position or + if the given stream position is at or earlier than the earliest + known stream position. + + False otherwise. + """ + assert isinstance(stream_pos, int) + + # _cache is not valid at or before the earliest known stream position, so + # return that an entity has changed. + if stream_pos <= self._earliest_known_stream_pos: self.metrics.inc_misses() return True - def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]: - """Returns all entities that have had new things since the given - position. If the position is too old it will return None. + # If the cache is empty, nothing can have changed. + if not self._cache: + self.metrics.inc_misses() + return False + + self.metrics.inc_hits() + return stream_pos < self._cache.peekitem()[0] + + def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult: + """ + Returns all entities that have had changes after the given position. + + If the stream change cache does not go far enough back, i.e. the + position is too old, it will return None. Returns the entities in the order that they were changed. + + Args: + stream_pos: The stream position to check for changes after. + + Return: + A class indicating if we have the requested data cached, and if so + includes the entities in the order they were changed. """ - assert type(stream_pos) is int + assert isinstance(stream_pos, int) - if stream_pos < self._earliest_known_stream_pos: - return None + # _cache is not valid at or before the earliest known stream position, so + # return None to mark that it is unknown if an entity has changed. + if stream_pos <= self._earliest_known_stream_pos: + return AllEntitiesChangedResult(None) changed_entities: List[EntityType] = [] for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)): changed_entities.extend(self._cache[k]) - return changed_entities + return AllEntitiesChangedResult(changed_entities) def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: - """Informs the cache that the entity has been changed at the given - position. """ - assert type(stream_pos) is int + Informs the cache that the entity has been changed at the given position. + Args: + entity: The entity to mark as changed. + stream_pos: The stream position to update the entity to. + """ + assert isinstance(stream_pos, int) + + # For a change before _cache is valid (e.g. at or before the earliest known + # stream position) there's nothing to do. if stream_pos <= self._earliest_known_stream_pos: return @@ -189,6 +290,11 @@ class StreamChangeCache: self._evict() def _evict(self) -> None: + """ + Ensure the cache has not exceeded the maximum size. + + Evicts entries until it is at the maximum size. + """ # if the cache is too big, remove entries while len(self._cache) > self._max_size: k, r = self._cache.popitem(0) @@ -199,5 +305,12 @@ class StreamChangeCache: def get_max_pos_of_last_change(self, entity: EntityType) -> int: """Returns an upper bound of the stream id of the last change to an entity. + + Args: + entity: The entity to check. + + Return: + The stream position of the latest change for the given entity or + the earliest known stream position if the entitiy is unknown. """ return self._entity_to_key.get(entity, self._earliest_known_stream_pos) diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index a0606851f7..39fab4fe06 100644 --- a/synapse/util/httpresourcetree.py +++ b/synapse/util/httpresourcetree.py
@@ -15,7 +15,9 @@ import logging from typing import Dict -from twisted.web.resource import NoResource, Resource +from twisted.web.resource import Resource + +from synapse.http.server import UnrecognizedRequestResource logger = logging.getLogger(__name__) @@ -49,7 +51,7 @@ def create_resource_tree( for path_seg in full_path.split(b"/")[1:-1]: if path_seg not in last_resource.listNames(): # resource doesn't exist, so make a "dummy resource" - child_resource: Resource = NoResource() + child_resource: Resource = UnrecognizedRequestResource() last_resource.putChild(path_seg, child_resource) res_id = _resource_id(last_resource, path_seg) resource_mappings[res_id] = child_resource diff --git a/synapse/visibility.py b/synapse/visibility.py
index b443857571..e442de3173 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py
@@ -26,8 +26,8 @@ from synapse.events.utils import prune_event from synapse.logging.opentracing import trace from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore -from synapse.storage.state import StateFilter from synapse.types import RetentionPolicy, StateMap, get_domain_from_id +from synapse.types.state import StateFilter from synapse.util import Clock logger = logging.getLogger(__name__)