diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 00eae92052..f6d17c53b1 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -164,7 +164,14 @@ class AuthHandler(BaseHandler):
self.bcrypt_rounds = hs.config.bcrypt_rounds
+ # we can't use hs.get_module_api() here, because to do so will create an
+ # import loop.
+ #
+ # TODO: refactor this class to separate the lower-level stuff that
+ # ModuleApi can use from the higher-level stuff that uses ModuleApi, as
+ # better way to break the loop
account_handler = ModuleApi(hs, self)
+
self.password_providers = [
module(config=config, account_handler=account_handler)
for module, config in hs.config.password_providers
@@ -212,7 +219,7 @@ class AuthHandler(BaseHandler):
self._clock = self.hs.get_clock()
# Expire old UI auth sessions after a period of time.
- if hs.config.worker_app is None:
+ if hs.config.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b9d9098104..debb1b4f29 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019,2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api import errors
from synapse.api.constants import EventTypes
@@ -29,7 +29,10 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
+ Collection,
+ JsonDict,
StreamToken,
+ UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
@@ -41,13 +44,16 @@ from synapse.util.retryutils import NotRetryingDestination
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
MAX_DEVICE_DISPLAY_NAME_LEN = 100
class DeviceWorkerHandler(BaseHandler):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
@@ -105,7 +111,9 @@ class DeviceWorkerHandler(BaseHandler):
@trace
@measure_func("device.get_user_ids_changed")
- async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
+ async def get_user_ids_changed(
+ self, user_id: str, from_token: StreamToken
+ ) -> JsonDict:
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
"""
@@ -221,8 +229,8 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = possibly_changed & users_who_share_room
possibly_left = (possibly_changed | possibly_left) - users_who_share_room
else:
- possibly_joined = []
- possibly_left = []
+ possibly_joined = set()
+ possibly_left = set()
result = {"changed": list(possibly_joined), "left": list(possibly_left)}
@@ -230,7 +238,7 @@ class DeviceWorkerHandler(BaseHandler):
return result
- async def on_federation_query_user_devices(self, user_id):
+ async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
user_id
)
@@ -249,7 +257,7 @@ class DeviceWorkerHandler(BaseHandler):
class DeviceHandler(DeviceWorkerHandler):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.federation_sender = hs.get_federation_sender()
@@ -264,7 +272,7 @@ class DeviceHandler(DeviceWorkerHandler):
hs.get_distributor().observe("user_left_room", self.user_left_room)
- def _check_device_name_length(self, name: str):
+ def _check_device_name_length(self, name: Optional[str]):
"""
Checks whether a device name is longer than the maximum allowed length.
@@ -283,8 +291,11 @@ class DeviceHandler(DeviceWorkerHandler):
)
async def check_device_registered(
- self, user_id, device_id, initial_device_display_name=None
- ):
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ initial_device_display_name: Optional[str] = None,
+ ) -> str:
"""
If the given device has not been registered, register it with the
supplied display name.
@@ -292,12 +303,11 @@ class DeviceHandler(DeviceWorkerHandler):
If no device_id is supplied, we make one up.
Args:
- user_id (str): @user:id
- device_id (str | None): device id supplied by client
- initial_device_display_name (str | None): device display name from
- client
+ user_id: @user:id
+ device_id: device id supplied by client
+ initial_device_display_name: device display name from client
Returns:
- str: device id (generated if none was supplied)
+ device id (generated if none was supplied)
"""
self._check_device_name_length(initial_device_display_name)
@@ -316,15 +326,15 @@ class DeviceHandler(DeviceWorkerHandler):
# times in case of a clash.
attempts = 0
while attempts < 5:
- device_id = stringutils.random_string(10).upper()
+ new_device_id = stringutils.random_string(10).upper()
new_device = await self.store.store_device(
user_id=user_id,
- device_id=device_id,
+ device_id=new_device_id,
initial_device_display_name=initial_device_display_name,
)
if new_device:
- await self.notify_device_update(user_id, [device_id])
- return device_id
+ await self.notify_device_update(user_id, [new_device_id])
+ return new_device_id
attempts += 1
raise errors.StoreError(500, "Couldn't generate a device ID.")
@@ -433,7 +443,9 @@ class DeviceHandler(DeviceWorkerHandler):
@trace
@measure_func("notify_device_update")
- async def notify_device_update(self, user_id, device_ids):
+ async def notify_device_update(
+ self, user_id: str, device_ids: Collection[str]
+ ) -> None:
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.
"""
@@ -445,7 +457,7 @@ class DeviceHandler(DeviceWorkerHandler):
user_id
)
- hosts = set()
+ hosts = set() # type: Set[str]
if self.hs.is_mine_id(user_id):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
@@ -497,7 +509,7 @@ class DeviceHandler(DeviceWorkerHandler):
self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
- async def user_left_room(self, user, room_id):
+ async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
@@ -505,8 +517,89 @@ class DeviceHandler(DeviceWorkerHandler):
# receive device updates. Mark this in DB.
await self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+ async def store_dehydrated_device(
+ self,
+ user_id: str,
+ device_data: JsonDict,
+ initial_device_display_name: Optional[str] = None,
+ ) -> str:
+ """Store a dehydrated device for a user. If the user had a previous
+ dehydrated device, it is removed.
+
+ Args:
+ user_id: the user that we are storing the device for
+ device_data: the dehydrated device information
+ initial_device_display_name: The display name to use for the device
+ Returns:
+ device id of the dehydrated device
+ """
+ device_id = await self.check_device_registered(
+ user_id, None, initial_device_display_name,
+ )
+ old_device_id = await self.store.store_dehydrated_device(
+ user_id, device_id, device_data
+ )
+ if old_device_id is not None:
+ await self.delete_device(user_id, old_device_id)
+ return device_id
+
+ async def get_dehydrated_device(
+ self, user_id: str
+ ) -> Optional[Tuple[str, JsonDict]]:
+ """Retrieve the information for a dehydrated device.
+
+ Args:
+ user_id: the user whose dehydrated device we are looking for
+ Returns:
+ a tuple whose first item is the device ID, and the second item is
+ the dehydrated device information
+ """
+ return await self.store.get_dehydrated_device(user_id)
+
+ async def rehydrate_device(
+ self, user_id: str, access_token: str, device_id: str
+ ) -> dict:
+ """Process a rehydration request from the user.
+
+ Args:
+ user_id: the user who is rehydrating the device
+ access_token: the access token used for the request
+ device_id: the ID of the device that will be rehydrated
+ Returns:
+ a dict containing {"success": True}
+ """
+ success = await self.store.remove_dehydrated_device(user_id, device_id)
+
+ if not success:
+ raise errors.NotFoundError()
+
+ # If the dehydrated device was successfully deleted (the device ID
+ # matched the stored dehydrated device), then modify the access
+ # token to use the dehydrated device's ID and copy the old device
+ # display name to the dehydrated device, and destroy the old device
+ # ID
+ old_device_id = await self.store.set_device_for_access_token(
+ access_token, device_id
+ )
+ old_device = await self.store.get_device(user_id, old_device_id)
+ await self.store.update_device(user_id, device_id, old_device["display_name"])
+ # can't call self.delete_device because that will clobber the
+ # access token so call the storage layer directly
+ await self.store.delete_device(user_id, old_device_id)
+ await self.store.delete_e2e_keys_by_device(
+ user_id=user_id, device_id=old_device_id
+ )
-def _update_device_from_client_ips(device, client_ips):
+ # tell everyone that the old device is gone and that the dehydrated
+ # device has a new display name
+ await self.notify_device_update(user_id, [old_device_id, device_id])
+
+ return {"success": True}
+
+
+def _update_device_from_client_ips(
+ device: Dict[str, Any], client_ips: Dict[Tuple[str, str], Dict[str, Any]]
+) -> None:
ip = client_ips.get((device["user_id"], device["device_id"]), {})
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
@@ -514,7 +607,7 @@ def _update_device_from_client_ips(device, client_ips):
class DeviceListUpdater:
"Handles incoming device list updates from federation and updates the DB"
- def __init__(self, hs, device_handler):
+ def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.store = hs.get_datastore()
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
@@ -523,7 +616,9 @@ class DeviceListUpdater:
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
# user_id -> list of updates waiting to be handled.
- self._pending_updates = {}
+ self._pending_updates = (
+ {}
+ ) # type: Dict[str, List[Tuple[str, str, Iterable[str], JsonDict]]]
# Recently seen stream ids. We don't bother keeping these in the DB,
# but they're useful to have them about to reduce the number of spurious
@@ -546,7 +641,9 @@ class DeviceListUpdater:
)
@trace
- async def incoming_device_list_update(self, origin, edu_content):
+ async def incoming_device_list_update(
+ self, origin: str, edu_content: JsonDict
+ ) -> None:
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
@@ -607,7 +704,7 @@ class DeviceListUpdater:
await self._handle_device_updates(user_id)
@measure_func("_incoming_device_list_update")
- async def _handle_device_updates(self, user_id):
+ async def _handle_device_updates(self, user_id: str) -> None:
"Actually handle pending updates."
with (await self._remote_edu_linearizer.queue(user_id)):
@@ -655,7 +752,9 @@ class DeviceListUpdater:
stream_id for _, stream_id, _, _ in pending_updates
)
- async def _need_to_do_resync(self, user_id, updates):
+ async def _need_to_do_resync(
+ self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]]
+ ) -> bool:
"""Given a list of updates for a user figure out if we need to do a full
resync, or whether we have enough data that we can just apply the delta.
"""
@@ -686,7 +785,7 @@ class DeviceListUpdater:
return False
@trace
- async def _maybe_retry_device_resync(self):
+ async def _maybe_retry_device_resync(self) -> None:
"""Retry to resync device lists that are out of sync, except if another retry is
in progress.
"""
@@ -729,7 +828,7 @@ class DeviceListUpdater:
async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
- ) -> Optional[dict]:
+ ) -> Optional[JsonDict]:
"""Fetches all devices for a user and updates the device cache with them.
Args:
@@ -753,7 +852,7 @@ class DeviceListUpdater:
# it later.
await self.store.mark_remote_user_device_cache_as_stale(user_id)
- return
+ return None
except (RequestSendFailed, HttpResponseException) as e:
logger.warning(
"Failed to handle device list update for %s: %s", user_id, e,
@@ -770,12 +869,12 @@ class DeviceListUpdater:
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
- return
+ return None
except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
- return
+ return None
except Exception as e:
set_tag("error", True)
log_kv(
@@ -788,7 +887,7 @@ class DeviceListUpdater:
# it later.
await self.store.mark_remote_user_device_cache_as_stale(user_id)
- return
+ return None
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
@@ -849,7 +948,7 @@ class DeviceListUpdater:
user_id: str,
master_key: Optional[Dict[str, Any]],
self_signing_key: Optional[Dict[str, Any]],
- ) -> list:
+ ) -> List[str]:
"""Process the given new master and self-signing key for the given remote user.
Args:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 62aa9a2da8..ad5683d251 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -46,6 +46,7 @@ class DirectoryHandler(BaseHandler):
self.config = hs.config
self.enable_room_list_search = hs.config.enable_room_list_search
self.require_membership = hs.config.require_membership_for_aliases
+ self.third_party_event_rules = hs.get_third_party_event_rules()
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -383,7 +384,7 @@ class DirectoryHandler(BaseHandler):
"""
creator = await self.store.get_room_alias_creator(alias.to_string())
- if creator is not None and creator == user_id:
+ if creator == user_id:
return True
# Resolve the alias to the corresponding room.
@@ -454,6 +455,15 @@ class DirectoryHandler(BaseHandler):
# per alias creation rule?
raise SynapseError(403, "Not allowed to publish room")
+ # Check if publishing is blocked by a third party module
+ allowed_by_third_party_rules = await (
+ self.third_party_event_rules.check_visibility_can_be_modified(
+ room_id, visibility
+ )
+ )
+ if not allowed_by_third_party_rules:
+ raise SynapseError(403, "Not allowed to publish room")
+
await self.store.set_room_is_public(room_id, making_public)
async def edit_published_appservice_room_list(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index dd40fd1299..611742ae72 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -496,6 +496,22 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
+ fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
+ if fallback_keys and isinstance(fallback_keys, dict):
+ log_kv(
+ {
+ "message": "Updating fallback_keys for device.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
+ await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
+ elif fallback_keys:
+ log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
+ else:
+ log_kv(
+ {"message": "Did not update fallback_keys", "reason": "no keys given"}
+ )
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1a8144405a..5ac2fc5656 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -3008,6 +3008,9 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
+ # the event has been persisted so it should have a stream ordering.
+ assert event.internal_metadata.stream_ordering
+
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee271e85e5..3e9a22e8f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -635,59 +635,6 @@ class EventCreationHandler:
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
- async def send_nonmember_event(
- self,
- requester: Requester,
- event: EventBase,
- context: EventContext,
- ratelimit: bool = True,
- ignore_shadow_ban: bool = False,
- ) -> int:
- """
- Persists and notifies local clients and federation of an event.
-
- Args:
- requester: The requester sending the event.
- event: The event to send.
- context: The context of the event.
- ratelimit: Whether to rate limit this send.
- ignore_shadow_ban: True if shadow-banned users should be allowed to
- send this event.
-
- Return:
- The stream_id of the persisted event.
-
- Raises:
- ShadowBanError if the requester has been shadow-banned.
- """
- if event.type == EventTypes.Member:
- raise SynapseError(
- 500, "Tried to send member event through non-member codepath"
- )
-
- if not ignore_shadow_ban and requester.shadow_banned:
- # We randomly sleep a bit just to annoy the requester.
- await self.clock.sleep(random.randint(1, 10))
- raise ShadowBanError()
-
- user = UserID.from_string(event.sender)
-
- assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
- if event.is_state():
- prev_event = await self.deduplicate_state_event(event, context)
- if prev_event is not None:
- logger.info(
- "Not bothering to persist state event %s duplicated by %s",
- event.event_id,
- prev_event.event_id,
- )
- return await self.store.get_stream_id_for_event(prev_event.event_id)
-
- return await self.handle_new_client_event(
- requester=requester, event=event, context=context, ratelimit=ratelimit
- )
-
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
) -> Optional[EventBase]:
@@ -728,7 +675,7 @@ class EventCreationHandler:
"""
Creates an event, then sends it.
- See self.create_event and self.send_nonmember_event.
+ See self.create_event and self.handle_new_client_event.
Args:
requester: The requester sending the event.
@@ -738,9 +685,19 @@ class EventCreationHandler:
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
+ Returns:
+ The event, and its stream ordering (if state event deduplication happened,
+ the previous, duplicate event).
+
Raises:
ShadowBanError if the requester has been shadow-banned.
"""
+
+ if event_dict["type"] == EventTypes.Member:
+ raise SynapseError(
+ 500, "Tried to send member event through non-member codepath"
+ )
+
if not ignore_shadow_ban and requester.shadow_banned:
# We randomly sleep a bit just to annoy the requester.
await self.clock.sleep(random.randint(1, 10))
@@ -756,20 +713,27 @@ class EventCreationHandler:
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
+
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
- stream_id = await self.send_nonmember_event(
- requester,
- event,
- context,
+ ev = await self.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
- return event, stream_id
+
+ # we know it was persisted, so must have a stream ordering
+ assert ev.internal_metadata.stream_ordering
+ return ev, ev.internal_metadata.stream_ordering
@measure_func("create_new_client_event")
async def create_new_client_event(
@@ -843,8 +807,11 @@ class EventCreationHandler:
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
- ) -> int:
- """Processes a new event. This includes checking auth, persisting it,
+ ignore_shadow_ban: bool = False,
+ ) -> EventBase:
+ """Processes a new event.
+
+ This includes deduplicating, checking auth, persisting,
notifying users, sending to remote servers, etc.
If called from a worker will hit out to the master process for final
@@ -857,10 +824,39 @@ class EventCreationHandler:
ratelimit
extra_users: Any extra users to notify about event
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
+
Return:
- The stream_id of the persisted event.
+ If the event was deduplicated, the previous, duplicate, event. Otherwise,
+ `event`.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
+ # we don't apply shadow-banning to membership events here. Invites are blocked
+ # higher up the stack, and we allow shadow-banned users to send join and leave
+ # events as normal.
+ if (
+ event.type != EventTypes.Member
+ and not ignore_shadow_ban
+ and requester.shadow_banned
+ ):
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
+ if event.is_state():
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ logger.info(
+ "Not bothering to persist state event %s duplicated by %s",
+ event.event_id,
+ prev_event.event_id,
+ )
+ return prev_event
+
if event.is_state() and (event.type, event.state_key) == (
EventTypes.Create,
"",
@@ -915,13 +911,13 @@ class EventCreationHandler:
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
- return stream_id
+ return event
stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
- return stream_id
+ return event
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
@@ -1232,8 +1228,12 @@ class EventCreationHandler:
# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
- await self.send_nonmember_event(
- requester, event, context, ratelimit=False, ignore_shadow_ban=True,
+ await self.handle_new_client_event(
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=False,
+ ignore_shadow_ban=True,
)
return True
except ConsentNotGivenError:
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 19cd652675..05ac86e697 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -96,6 +96,7 @@ class OidcHandler:
self.hs = hs
self._callback_url = hs.config.oidc_callback_url # type: str
self._scopes = hs.config.oidc_scopes # type: List[str]
+ self._user_profile_method = hs.config.oidc_user_profile_method # type: str
self._client_auth = ClientAuth(
hs.config.oidc_client_id,
hs.config.oidc_client_secret,
@@ -196,11 +197,11 @@ class OidcHandler:
% (m["response_types_supported"],)
)
- # If the openid scope was not requested, we need a userinfo endpoint to fetch user infos
+ # Ensure there's a userinfo endpoint to fetch from if it is required.
if self._uses_userinfo:
if m.get("userinfo_endpoint") is None:
raise ValueError(
- 'provider has no "userinfo_endpoint", even though it is required because the "openid" scope is not requested'
+ 'provider has no "userinfo_endpoint", even though it is required'
)
else:
# If we're not using userinfo, we need a valid jwks to validate the ID token
@@ -220,8 +221,10 @@ class OidcHandler:
``access_token`` with the ``userinfo_endpoint``.
"""
- # Maybe that should be user-configurable and not inferred?
- return "openid" not in self._scopes
+ return (
+ "openid" not in self._scopes
+ or self._user_profile_method == "userinfo_endpoint"
+ )
async def load_metadata(self) -> OpenIDProviderMetadata:
"""Load and validate the provider metadata.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d5f7c78edf..d0530a446c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -185,6 +185,7 @@ class RoomCreationHandler(BaseHandler):
ShadowBanError if the requester is shadow-banned.
"""
user_id = requester.user.to_string()
+ assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
# start by allocating a new room id
r = await self.store.get_room(old_room_id)
@@ -229,8 +230,8 @@ class RoomCreationHandler(BaseHandler):
)
# now send the tombstone
- await self.event_creation_handler.send_nonmember_event(
- requester, tombstone_event, tombstone_context
+ await self.event_creation_handler.handle_new_client_event(
+ requester=requester, event=tombstone_event, context=tombstone_context,
)
old_room_state = await tombstone_context.get_current_state_ids()
@@ -681,6 +682,15 @@ class RoomCreationHandler(BaseHandler):
creator_id=user_id, is_public=is_public, room_version=room_version,
)
+ # Check whether this visibility value is blocked by a third party module
+ allowed_by_third_party_rules = await (
+ self.third_party_event_rules.check_visibility_can_be_modified(
+ room_id, visibility
+ )
+ )
+ if not allowed_by_third_party_rules:
+ raise SynapseError(403, "Room visibility value not allowed.")
+
directory_handler = self.hs.get_handlers().directory_handler
if room_alias:
await directory_handler.create_association(
@@ -962,8 +972,6 @@ class RoomCreationHandler(BaseHandler):
try:
random_string = stringutils.random_string(18)
gen_room_id = RoomID(random_string, self.hs.hostname).to_string()
- if isinstance(gen_room_id, bytes):
- gen_room_id = gen_room_id.decode("utf-8")
await self.store.store_room(
room_id=gen_room_id,
room_creator_user_id=creator_id,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8feba8c90a..fd8114a64d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
from unpaddedbase64 import encode_base64
from synapse import types
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -188,15 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent=require_consent,
)
- # Check if this event matches the previous membership event for the user.
- duplicate = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if duplicate is not None:
- # Discard the new event since this membership change is a no-op.
- _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
- return duplicate.event_id, stream_id
-
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -221,7 +212,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
@@ -231,7 +222,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
- return event.event_id, stream_id
+ # we know it was persisted, so should have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -247,7 +240,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
user_account_data, _ = await self.store.get_account_data_for_user(user_id)
# Copy direct message state if applicable
- direct_rooms = user_account_data.get("m.direct", {})
+ direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
# Check which key this room is under
if isinstance(direct_rooms, dict):
@@ -258,7 +251,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Save back to user's m.direct account data
await self.store.add_account_data_for_user(
- user_id, "m.direct", direct_rooms
+ user_id, AccountDataTypes.DIRECT, direct_rooms
)
break
@@ -441,12 +434,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
- _, stream_id = await self.store.get_event_ordering(
- old_state.event_id
- )
+ # duplicate event.
+ # we know it was persisted, so must have a stream ordering.
+ assert old_state.internal_metadata.stream_ordering
return (
old_state.event_id,
- stream_id,
+ old_state.internal_metadata.stream_ordering,
)
if old_membership in ["ban", "leave"] and action == "kick":
@@ -642,7 +635,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
async def send_membership_event(
self,
- requester: Requester,
+ requester: Optional[Requester],
event: EventBase,
context: EventContext,
ratelimit: bool = True,
@@ -672,12 +665,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
else:
requester = types.create_requester(target_user)
- prev_event = await self.event_creation_handler.deduplicate_state_event(
- event, context
- )
- if prev_event is not None:
- return
-
prev_state_ids = await context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
@@ -1185,10 +1172,13 @@ class RoomMemberMasterHandler(RoomMemberHandler):
context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
- stream_id = await self.event_creation_handler.handle_new_client_event(
+ result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
- return event.event_id, stream_id
+ # we know it was persisted, so must have a stream ordering
+ assert result_event.internal_metadata.stream_ordering
+
+ return result_event.event_id, result_event.internal_metadata.stream_ordering
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 249ffe2a55..dc62b21c06 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,7 +49,7 @@ class StatsHandler:
# Guard to ensure we only process deltas one at a time
self._is_processing = False
- if hs.config.stats_enabled:
+ if self.stats_enabled and hs.config.run_background_tasks:
self.notifier.add_replication_callback(self.notify_new_event)
# We kick this off so that we don't have to wait for a change before
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bfe2583002..6fb8332f93 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tup
import attr
from prometheus_client import Counter
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
@@ -87,7 +87,7 @@ class SyncConfig:
class TimelineBatch:
prev_batch = attr.ib(type=StreamToken)
events = attr.ib(type=List[EventBase])
- limited = attr.ib(bool)
+ limited = attr.ib(type=bool)
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -201,6 +201,8 @@ class SyncResult:
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
+ device_unused_fallback_key_types: List of key types that have an unused fallback
+ key
groups: Group updates, if any
"""
@@ -213,6 +215,7 @@ class SyncResult:
to_device = attr.ib(type=List[JsonDict])
device_lists = attr.ib(type=DeviceLists)
device_one_time_keys_count = attr.ib(type=JsonDict)
+ device_unused_fallback_key_types = attr.ib(type=List[str])
groups = attr.ib(type=Optional[GroupsSyncResult])
def __bool__(self) -> bool:
@@ -457,8 +460,13 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
+ prev_batch_token = now_token
+ if recents:
+ room_key = recents[0].internal_metadata.before
+ prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+
return TimelineBatch(
- events=recents, prev_batch=now_token, limited=False
+ events=recents, prev_batch=prev_batch_token, limited=False
)
filtering_factor = 2
@@ -1014,10 +1022,14 @@ class SyncHandler:
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_key_counts = {} # type: JsonDict
+ unused_fallback_key_types = [] # type: List[str]
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
+ unused_fallback_key_types = await self.store.get_e2e_unused_fallback_key_types(
+ user_id, device_id
+ )
logger.debug("Fetching group data")
await self._generate_sync_entry_for_groups(sync_result_builder)
@@ -1041,6 +1053,7 @@ class SyncHandler:
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
+ device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
@@ -1378,13 +1391,16 @@ class SyncHandler:
return set(), set(), set(), set()
ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
- "m.ignored_user_list", user_id=user_id
+ AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
)
+ # If there is ignored users account data and it matches the proper type,
+ # then use it.
+ ignored_users = frozenset() # type: FrozenSet[str]
if ignored_account_data:
- ignored_users = ignored_account_data.get("ignored_users", {}).keys()
- else:
- ignored_users = frozenset()
+ ignored_users_data = ignored_account_data.get("ignored_users", {})
+ if isinstance(ignored_users_data, dict):
+ ignored_users = frozenset(ignored_users_data.keys())
if since_token:
room_changes = await self._get_rooms_changed(
@@ -1478,7 +1494,7 @@ class SyncHandler:
return False
async def _get_rooms_changed(
- self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges:
"""Gets the the changes that have happened since the last sync.
"""
@@ -1690,7 +1706,7 @@ class SyncHandler:
return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
async def _get_all_rooms(
- self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
@@ -1764,7 +1780,7 @@ class SyncHandler:
async def _generate_room_entry(
self,
sync_result_builder: "SyncResultBuilder",
- ignored_users: Set[str],
+ ignored_users: FrozenSet[str],
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
tags: Optional[Dict[str, Dict[str, Any]]],
|