diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3d83236b0c..0327fc57a4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -280,7 +280,7 @@ class AuthHandler:
that it isn't stolen by re-authenticating them.
Args:
- requester: The user, as given by the access token
+ requester: The user making the request, according to the access token.
request: The request sent by the client.
@@ -565,7 +565,7 @@ class AuthHandler:
except LoginError as e:
# this step failed. Merge the error dict into the response
# so that the client can have another go.
- errordict = e.error_dict()
+ errordict = e.error_dict(self.hs.config)
creds = await self.store.get_completed_ui_auth_stages(session.session_id)
for f in flows:
@@ -1435,20 +1435,25 @@ class AuthHandler:
access_token: access token to be deleted
"""
- user_info = await self.auth.get_user_by_access_token(access_token)
+ token = await self.store.get_user_by_access_token(access_token)
+ if not token:
+ # At this point, the token should already have been fetched once by
+ # the caller, so this should not happen, unless of a race condition
+ # between two delete requests
+ raise SynapseError(HTTPStatus.UNAUTHORIZED, "Unrecognised access token")
await self.store.delete_access_token(access_token)
# see if any modules want to know about this
await self.password_auth_provider.on_logged_out(
- user_id=user_info.user_id,
- device_id=user_info.device_id,
+ user_id=token.user_id,
+ device_id=token.device_id,
access_token=access_token,
)
# delete pushers associated with this access token
- if user_info.token_id is not None:
+ if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
- user_info.user_id, (user_info.token_id,)
+ token.user_id, (token.token_id,)
)
async def delete_access_tokens_for_user(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c05a170c55..f5c586f657 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -74,6 +74,7 @@ class DeviceWorkerHandler:
self._state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
self.server_name = hs.hostname
+ self._msc3852_enabled = hs.config.experimental.msc3852_enabled
@trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
@@ -118,8 +119,8 @@ class DeviceWorkerHandler:
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
- set_tag("device", device)
- set_tag("ips", ips)
+ set_tag("device", str(device))
+ set_tag("ips", str(ips))
return device
@@ -170,7 +171,7 @@ class DeviceWorkerHandler:
"""
set_tag("user_id", user_id)
- set_tag("from_token", from_token)
+ set_tag("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -747,7 +748,13 @@ def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
) -> None:
ip = client_ips.get((device["user_id"], device["device_id"]), {})
- device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
+ device.update(
+ {
+ "last_seen_user_agent": ip.get("user_agent"),
+ "last_seen_ts": ip.get("last_seen"),
+ "last_seen_ip": ip.get("ip"),
+ }
+ )
class DeviceListUpdater:
@@ -795,7 +802,7 @@ class DeviceListUpdater:
"""
set_tag("origin", origin)
- set_tag("edu_content", edu_content)
+ set_tag("edu_content", str(edu_content))
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 09a7a4b238..948f66a94d 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -30,7 +30,7 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.module_api import NOT_SPAM
from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, RoomAlias, get_domain_from_id
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -133,7 +133,7 @@ class DirectoryHandler:
else:
# Server admins are not subject to the same constraints as normal
# users when creating an alias (e.g. being in the room).
- is_admin = await self.auth.is_server_admin(requester.user)
+ is_admin = await self.auth.is_server_admin(requester)
if (self.require_membership and check_membership) and not is_admin:
rooms_for_user = await self.store.get_rooms_for_user(user_id)
@@ -197,7 +197,7 @@ class DirectoryHandler:
user_id = requester.user.to_string()
try:
- can_delete = await self._user_can_delete_alias(room_alias, user_id)
+ can_delete = await self._user_can_delete_alias(room_alias, requester)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown room alias")
@@ -400,7 +400,9 @@ class DirectoryHandler:
# either no interested services, or no service with an exclusive lock
return True
- async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
+ async def _user_can_delete_alias(
+ self, alias: RoomAlias, requester: Requester
+ ) -> bool:
"""Determine whether a user can delete an alias.
One of the following must be true:
@@ -413,7 +415,7 @@ class DirectoryHandler:
"""
creator = await self.store.get_room_alias_creator(alias.to_string())
- if creator == user_id:
+ if creator == requester.user.to_string():
return True
# Resolve the alias to the corresponding room.
@@ -422,9 +424,7 @@ class DirectoryHandler:
if not room_id:
return False
- return await self.auth.check_can_change_room_list(
- room_id, UserID.from_string(user_id)
- )
+ return await self.auth.check_can_change_room_list(room_id, requester)
async def edit_published_room_list(
self, requester: Requester, room_id: str, visibility: str
@@ -463,7 +463,7 @@ class DirectoryHandler:
raise SynapseError(400, "Unknown room")
can_change_room_list = await self.auth.check_can_change_room_list(
- room_id, requester.user
+ room_id, requester
)
if not can_change_room_list:
raise AuthError(
@@ -528,10 +528,8 @@ class DirectoryHandler:
Get a list of the aliases that currently point to this room on this server
"""
# allow access to server admins and current members of the room
- is_admin = await self.auth.is_server_admin(requester.user)
+ is_admin = await self.auth.is_server_admin(requester)
if not is_admin:
- await self.auth.check_user_in_room_or_world_readable(
- room_id, requester.user.to_string()
- )
+ await self.auth.check_user_in_room_or_world_readable(room_id, requester)
return await self.store.get_aliases_for_room(room_id)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 84c28c480e..c938339ddd 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -138,8 +138,8 @@ class E2eKeysHandler:
else:
remote_queries[user_id] = device_ids
- set_tag("local_key_query", local_query)
- set_tag("remote_key_query", remote_queries)
+ set_tag("local_key_query", str(local_query))
+ set_tag("remote_key_query", str(remote_queries))
# First get local devices.
# A map of destination -> failure response.
@@ -343,7 +343,7 @@ class E2eKeysHandler:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
- set_tag("reason", failure)
+ set_tag("reason", str(failure))
return
@@ -405,7 +405,7 @@ class E2eKeysHandler:
Returns:
A map from user_id -> device_id -> device details
"""
- set_tag("local_query", query)
+ set_tag("local_query", str(query))
local_query: List[Tuple[str, Optional[str]]] = []
result_dict: Dict[str, Dict[str, dict]] = {}
@@ -477,8 +477,8 @@ class E2eKeysHandler:
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
- set_tag("local_key_query", local_query)
- set_tag("remote_key_query", remote_queries)
+ set_tag("local_key_query", str(local_query))
+ set_tag("remote_key_query", str(remote_queries))
results = await self.store.claim_e2e_one_time_keys(local_query)
@@ -508,7 +508,7 @@ class E2eKeysHandler:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
- set_tag("reason", failure)
+ set_tag("reason", str(failure))
await make_deferred_yieldable(
defer.gatherResults(
@@ -611,7 +611,7 @@ class E2eKeysHandler:
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
- set_tag("one_time_key_counts", result)
+ set_tag("one_time_key_counts", str(result))
return {"one_time_key_counts": result}
async def _upload_one_time_keys_for_user(
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 446f509bdc..28dc08c22a 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, Optional
+from typing import TYPE_CHECKING, Dict, Optional, cast
from typing_extensions import Literal
@@ -97,7 +97,7 @@ class E2eRoomKeysHandler:
user_id, version, room_id, session_id
)
- log_kv(results)
+ log_kv(cast(JsonDict, results))
return results
@trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 11a005f0bf..4bb4d09d4a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,7 @@ from typing import (
)
import attr
+from prometheus_client import Histogram
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -59,6 +60,7 @@ from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
@@ -78,6 +80,29 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+# Added to debug performance and track progress on optimizations
+backfill_processing_before_timer = Histogram(
+ "synapse_federation_backfill_processing_before_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 30.0,
+ 40.0,
+ 60.0,
+ 80.0,
+ "+Inf",
+ ),
+)
+
def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state
@@ -137,6 +162,7 @@ class FederationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
+ self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
@@ -180,6 +206,7 @@ class FederationHandler:
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
+ @trace
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
@@ -195,12 +222,39 @@ class FederationHandler:
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
+ # Starting the processing time here so we can include the room backfill
+ # linearizer lock queue in the timing
+ processing_start_time = self.clock.time_msec()
+
async with self._room_backfill.queue(room_id):
- return await self._maybe_backfill_inner(room_id, current_depth, limit)
+ return await self._maybe_backfill_inner(
+ room_id,
+ current_depth,
+ limit,
+ processing_start_time=processing_start_time,
+ )
async def _maybe_backfill_inner(
- self, room_id: str, current_depth: int, limit: int
+ self,
+ room_id: str,
+ current_depth: int,
+ limit: int,
+ *,
+ processing_start_time: int,
) -> bool:
+ """
+ Checks whether the `current_depth` is at or approaching any backfill
+ points in the room and if so, will backfill. We only care about
+ checking backfill points that happened before the `current_depth`
+ (meaning less than or equal to the `current_depth`).
+
+ Args:
+ room_id: The room to backfill in.
+ current_depth: The depth to check at for any upcoming backfill points.
+ limit: The max number of events to request from the remote federated server.
+ processing_start_time: The time when `maybe_backfill` started
+ processing. Only used for timing.
+ """
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
@@ -368,6 +422,14 @@ class FederationHandler:
logger.debug(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "extremities_to_request",
+ str(extremities_to_request),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
+ str(len(extremities_to_request)),
+ )
# Now we need to decide which hosts to hit first.
@@ -423,6 +485,11 @@ class FederationHandler:
return False
+ processing_end_time = self.clock.time_msec()
+ backfill_processing_before_timer.observe(
+ (processing_end_time - processing_start_time) / 1000
+ )
+
success = await try_backfill(likely_domains)
if success:
return True
@@ -546,9 +613,9 @@ class FederationHandler:
)
if ret.partial_state:
- # TODO(faster_joins): roll this back if we don't manage to start the
- # background resync (eg process_remote_join fails)
- # https://github.com/matrix-org/synapse/issues/12998
+ # Mark the room as having partial state.
+ # The background process is responsible for unmarking this flag,
+ # even if the join fails.
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
try:
@@ -574,17 +641,21 @@ class FederationHandler:
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
-
- if ret.partial_state:
- # Kick off the process of asynchronously fetching the state for this
- # room.
- run_as_background_process(
- desc="sync_partial_state_room",
- func=self._sync_partial_state_room,
- initial_destination=origin,
- other_destinations=ret.servers_in_room,
- room_id=room_id,
- )
+ finally:
+ # Always kick off the background process that asynchronously fetches
+ # state for the room.
+ # If the join failed, the background process is responsible for
+ # cleaning up — including unmarking the room as a partial state room.
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for this
+ # room.
+ run_as_background_process(
+ desc="sync_partial_state_room",
+ func=self._sync_partial_state_room,
+ initial_destination=origin,
+ other_destinations=ret.servers_in_room,
+ room_id=room_id,
+ )
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
@@ -748,6 +819,23 @@ class FederationHandler:
# (and return a 404 otherwise)
room_version = await self.store.get_room_version(room_id)
+ if await self.store.is_partial_state_room(room_id):
+ # If our server is still only partially joined, we can't give a complete
+ # response to /make_join, so return a 404 as we would if we weren't in the
+ # room at all.
+ # The main reason we can't respond properly is that we need to know about
+ # the auth events for the join event that we would return.
+ # We also should not bother entertaining the /make_join since we cannot
+ # handle the /send_join.
+ logger.info(
+ "Rejecting /make_join to %s because it's a partial state room", room_id
+ )
+ raise SynapseError(
+ 404,
+ "Unable to handle /make_join right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
# now check that we are *still* in the room
is_in_room = await self._event_auth_handler.check_host_in_room(
room_id, self.server_name
@@ -1071,6 +1159,8 @@ class FederationHandler:
return event
+ @trace
+ @tag_args
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
"""Returns the state at the event. i.e. not including said event."""
event = await self.store.get_event(event_id, check_room_id=room_id)
@@ -1552,15 +1642,16 @@ class FederationHandler:
# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
+ destinations: Collection[str]
if initial_destination is not None:
# Move `initial_destination` to the front of the list.
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
- destination_iter = itertools.cycle(destinations)
else:
- destination_iter = itertools.cycle(other_destinations)
+ destinations = other_destinations
+ destination_iter = itertools.cycle(destinations)
# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index a5f4ce7c8a..048c4111f6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
Tuple,
)
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import (
@@ -59,6 +59,13 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import (
+ SynapseTags,
+ set_tag,
+ start_active_span,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
@@ -91,6 +98,36 @@ soft_failed_event_counter = Counter(
"Events received over federation that we marked as soft_failed",
)
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+ "synapse_federation_backfill_processing_after_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.25,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 25.0,
+ 30.0,
+ 40.0,
+ 50.0,
+ 60.0,
+ 80.0,
+ 100.0,
+ 120.0,
+ 150.0,
+ 180.0,
+ "+Inf",
+ ),
+)
+
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -278,7 +315,8 @@ class FederationEventHandler:
)
try:
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
except PartialStateConflictError:
# The room was un-partial stated while we were processing the PDU.
# Try once more, with full state this time.
@@ -286,7 +324,8 @@ class FederationEventHandler:
"Room %s was un-partial stated while processing the PDU, trying again.",
room_id,
)
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
async def on_send_membership_event(
self, origin: str, event: EventBase
@@ -316,6 +355,7 @@ class FederationEventHandler:
The event and context of the event after inserting it into the room graph.
Raises:
+ RuntimeError if any prev_events are missing
SynapseError if the event is not accepted into the room
PartialStateConflictError if the room was un-partial stated in between
computing the state at the event and persisting it. The caller should
@@ -376,7 +416,7 @@ class FederationEventHandler:
# need to.
await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
- await self._check_for_soft_fail(event, None, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context)
return event, context
@@ -406,6 +446,7 @@ class FederationEventHandler:
prev_member_event,
)
+ @trace
async def process_remote_join(
self,
origin: str,
@@ -534,32 +575,36 @@ class FederationEventHandler:
#
# This is the same operation as we do when we receive a regular event
# over federation.
- state_ids = await self._resolve_state_at_missing_prevs(destination, event)
-
- # build a new state group for it if need be
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ destination, event
)
if context.partial_state:
# this can happen if some or all of the event's prev_events still have
- # partial state - ie, an event has an earlier stream_ordering than one
- # or more of its prev_events, so we de-partial-state it before its
- # prev_events.
+ # partial state. We were careful to only pick events from the db without
+ # partial-state prev events, so that implies that a prev event has
+ # been persisted (with partial state) since we did the query.
#
- # TODO(faster_joins): we probably need to be more intelligent, and
- # exclude partial-state prev_events from consideration
- # https://github.com/matrix-org/synapse/issues/13001
+ # So, let's just ignore `event` for now; when we re-run the db query
+ # we should instead get its partial-state prev event, which we will
+ # de-partial-state, and then come back to event.
logger.warning(
- "%s still has partial state: can't de-partial-state it yet",
+ "%s still has prev_events with partial state: can't de-partial-state it yet",
event.event_id,
)
return
+
+ # since the state at this event has changed, we should now re-evaluate
+ # whether it should have been rejected. We must already have all of the
+ # auth events (from last time we went round this path), so there is no
+ # need to pass the origin.
+ await self._check_event_auth(None, event, context)
+
await self._store.update_state_for_partial_state_event(event, context)
self._state_storage_controller.notify_event_un_partial_stated(
event.event_id
)
+ @trace
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> None:
@@ -589,21 +634,23 @@ class FederationEventHandler:
if not events:
return
- # if there are any events in the wrong room, the remote server is buggy and
- # should not be trusted.
- for ev in events:
- if ev.room_id != room_id:
- raise InvalidResponseError(
- f"Remote server {dest} returned event {ev.event_id} which is in "
- f"room {ev.room_id}, when we were backfilling in {room_id}"
- )
+ with backfill_processing_after_timer.time():
+ # if there are any events in the wrong room, the remote server is buggy and
+ # should not be trusted.
+ for ev in events:
+ if ev.room_id != room_id:
+ raise InvalidResponseError(
+ f"Remote server {dest} returned event {ev.event_id} which is in "
+ f"room {ev.room_id}, when we were backfilling in {room_id}"
+ )
- await self._process_pulled_events(
- dest,
- events,
- backfilled=True,
- )
+ await self._process_pulled_events(
+ dest,
+ events,
+ backfilled=True,
+ )
+ @trace
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
) -> None:
@@ -704,8 +751,9 @@ class FederationEventHandler:
logger.info("Got %d prev_events", len(missing_events))
await self._process_pulled_events(origin, missing_events, backfilled=False)
+ @trace
async def _process_pulled_events(
- self, origin: str, events: Iterable[EventBase], backfilled: bool
+ self, origin: str, events: Collection[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server
@@ -720,6 +768,15 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str([event.event_id for event in events]),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
@@ -742,6 +799,8 @@ class FederationEventHandler:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
+ @trace
+ @tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -793,7 +852,7 @@ class FederationEventHandler:
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
- "Ignoring received event %s which we have already seen",
+ "_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
@@ -806,29 +865,56 @@ class FederationEventHandler:
return
try:
- state_ids = await self._resolve_state_at_missing_prevs(origin, event)
- # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
- # not return partial state
- # https://github.com/matrix-org/synapse/issues/13002
+ try:
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
+ except PartialStateConflictError:
+ # The room was un-partial stated while we were processing the event.
+ # Try once more, with full state this time.
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
- await self._process_received_pdu(
- origin, event, state_ids=state_ids, backfilled=backfilled
- )
+ # We ought to have full state now, barring some unlikely race where we left and
+ # rejoned the room in the background.
+ if context.partial_state:
+ raise AssertionError(
+ f"Event {event.event_id} still has a partial resolved state "
+ f"after room {event.room_id} was un-partial stated"
+ )
+
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
except FederationError as e:
if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
raise
- async def _resolve_state_at_missing_prevs(
+ @trace
+ async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
- ) -> Optional[StateMap[str]]:
- """Calculate the state at an event with missing prev_events.
+ ) -> EventContext:
+ """Build an EventContext structure for a non-outlier event whose prev_events may
+ be missing.
- This is used when we have pulled a batch of events from a remote server, and
- still don't have all the prev_events.
+ This is used when we have pulled a batch of events from a remote server, and may
+ not have all the prev_events.
- If we already have all the prev_events for `event`, this method does nothing.
+ To build an EventContext, we need to calculate the state before the event. If we
+ already have all the prev_events for `event`, we can simply use the state after
+ the prev_events to calculate the state before `event`.
Otherwise, the missing prevs become new backwards extremities, and we fall back
to asking the remote server for the state after each missing `prev_event`,
@@ -849,8 +935,7 @@ class FederationEventHandler:
event: an event to check for missing prevs.
Returns:
- if we already had all the prev events, `None`. Otherwise, returns
- the event ids of the state at `event`.
+ The event context.
Raises:
FederationError if we fail to get the state from the remote server after any
@@ -864,7 +949,7 @@ class FederationEventHandler:
missing_prevs = prevs - seen
if not missing_prevs:
- return None
+ return await self._state_handler.compute_event_context(event)
logger.info(
"Event %s is missing prev_events %s: calculating state for a "
@@ -876,9 +961,15 @@ class FederationEventHandler:
# resolve them to find the correct state at the current event.
try:
+ # Determine whether we may be about to retrieve partial state
+ # Events may be un-partial stated right after we compute the partial state
+ # flag, but that's okay, as long as the flag errs on the conservative side.
+ partial_state_flags = await self._store.get_partial_state_events(seen)
+ partial_state = any(partial_state_flags.values())
+
# Get the state of the events we know about
ours = await self._state_storage_controller.get_state_groups_ids(
- room_id, seen
+ room_id, seen, await_full_state=False
)
# state_maps is a list of mappings from (type, state_key) to event_id
@@ -924,8 +1015,12 @@ class FederationEventHandler:
"We can't get valid state history.",
affected=event_id,
)
- return state_map
+ return await self._state_handler.compute_event_context(
+ event, state_ids_before_event=state_map, partial_state=partial_state
+ )
+ @trace
+ @tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -965,10 +1060,10 @@ class FederationEventHandler:
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)
- missing_desired_events = desired_events - have_events
+ missing_desired_event_ids = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
- len(missing_desired_events),
+ len(missing_desired_event_ids),
len(have_events),
)
@@ -980,13 +1075,30 @@ class FederationEventHandler:
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.
- missing_auth_events = set(auth_event_ids) - have_events
- missing_auth_events.difference_update(
- await self._store.have_seen_events(room_id, missing_auth_events)
+ missing_auth_event_ids = set(auth_event_ids) - have_events
+ missing_auth_event_ids.difference_update(
+ await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
- logger.debug("We are also missing %i auth events", len(missing_auth_events))
+ logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
+
+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
- missing_events = missing_desired_events | missing_auth_events
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+ str(missing_auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+ str(len(missing_auth_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+ str(missing_desired_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+ str(len(missing_desired_event_ids)),
+ )
# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
@@ -997,13 +1109,13 @@ class FederationEventHandler:
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
- if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+ if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
- logger.debug("Fetching %i events from remote", len(missing_events))
+ logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=missing_events
+ destination=destination, room_id=room_id, event_ids=missing_event_ids
)
# We now need to fill out the state map, which involves fetching the
@@ -1060,6 +1172,14 @@ class FederationEventHandler:
event_id,
failed_to_fetch,
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+ str(failed_to_fetch),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+ str(len(failed_to_fetch)),
+ )
if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
@@ -1068,6 +1188,8 @@ class FederationEventHandler:
return state_map
+ @trace
+ @tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1089,11 +1211,12 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
+ @trace
async def _process_received_pdu(
self,
origin: str,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
backfilled: bool = False,
) -> None:
"""Called when we have a new non-outlier event.
@@ -1115,24 +1238,18 @@ class FederationEventHandler:
event: event to be persisted
- state_ids: Normally None, but if we are handling a gap in the graph
- (ie, we are missing one or more prev_events), the resolved state at the
- event. Must not be partial state.
+ context: The `EventContext` to persist the event with.
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
PartialStateConflictError: if the room was un-partial stated in between
- computing the state at the event and persisting it. The caller should retry
- exactly once in this case. Will never be raised if `state_ids` is provided.
+ computing the state at the event and persisting it. The caller should
+ recompute `context` and retry exactly once when this happens.
"""
logger.debug("Processing event: %s", event)
assert not event.internal_metadata.outlier
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
- )
try:
await self._check_event_auth(origin, event, context)
except AuthError as e:
@@ -1144,7 +1261,7 @@ class FederationEventHandler:
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
- await self._check_for_soft_fail(event, state_ids, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context, backfilled)
@@ -1245,6 +1362,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
+ @trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1276,7 +1394,7 @@ class FederationEventHandler:
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_MARKER_INSERTION
+ EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
@@ -1329,6 +1447,55 @@ class FederationEventHandler:
marker_event,
)
+ async def backfill_event_id(
+ self, destination: str, room_id: str, event_id: str
+ ) -> EventBase:
+ """Backfill a single event and persist it as a non-outlier which means
+ we also pull in all of the state and auth events necessary for it.
+
+ Args:
+ destination: The homeserver to pull the given event_id from.
+ room_id: The room where the event is from.
+ event_id: The event ID to backfill.
+
+ Raises:
+ FederationError if we are unable to find the event from the destination
+ """
+ logger.info(
+ "backfill_event_id: event_id=%s from destination=%s", event_id, destination
+ )
+
+ room_version = await self._store.get_room_version(room_id)
+
+ event_from_response = await self._federation_client.get_pdu(
+ [destination],
+ event_id,
+ room_version,
+ )
+
+ if not event_from_response:
+ raise FederationError(
+ "ERROR",
+ 404,
+ "Unable to find event_id=%s from destination=%s to backfill."
+ % (event_id, destination),
+ affected=event_id,
+ )
+
+ # Persist the event we just fetched, including pulling all of the state
+ # and auth events to de-outlier it. This also sets up the necessary
+ # `state_groups` for the event.
+ await self._process_pulled_events(
+ destination,
+ [event_from_response],
+ # Prevent notifications going to clients
+ backfilled=True,
+ )
+
+ return event_from_response
+
+ @trace
+ @tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1374,6 +1541,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
+ @trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1392,6 +1560,16 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
+ event_ids = event_map.keys()
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str(event_ids),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(event_ids)),
+ )
+
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1508,14 +1686,17 @@ class FederationEventHandler:
backfilled=True,
)
+ @trace
async def _check_event_auth(
- self, origin: str, event: EventBase, context: EventContext
+ self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
"""
Checks whether an event should be rejected (for failing auth checks).
Args:
- origin: The host the event originates from.
+ origin: The host the event originates from. This is used to fetch
+ any missing auth events. It can be set to None, but only if we are
+ sure that we already have all the auth events.
event: The event itself.
context:
The event context.
@@ -1544,6 +1725,14 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+ str([ev.event_id for ev in claimed_auth_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+ str(len(claimed_auth_events)),
+ )
# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1641,6 +1830,7 @@ class FederationEventHandler:
)
context.rejected = RejectedReason.AUTH_ERROR
+ @trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1658,17 +1848,27 @@ class FederationEventHandler:
async def _check_for_soft_fail(
self,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
origin: str,
) -> None:
"""Checks if we should soft fail the event; if so, marks the event as
such.
+ Does nothing for events in rooms with partial state, since we may not have an
+ accurate membership event for the sender in the current state.
+
Args:
event
- state_ids: The state at the event if we don't have all the event's prev events
+ context: The `EventContext` which we are about to persist the event with.
origin: The host the event originates from.
"""
+ if await self._store.is_partial_state_room(event.room_id):
+ # We might not know the sender's membership in the current state, so don't
+ # soft fail anything. Even if we do have a membership for the sender in the
+ # current state, it may have been derived from state resolution between
+ # partial and full state and may not be accurate.
+ return
+
extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
extrem_ids = set(extrem_ids_list)
prev_event_ids = set(event.prev_event_ids())
@@ -1685,11 +1885,15 @@ class FederationEventHandler:
auth_types = auth_types_for_event(room_version_obj, event)
# Calculate the "current state".
- if state_ids is not None:
- # If we're explicitly given the state then we won't have all the
- # prev events, and so we have a gap in the graph. In this case
- # we want to be a little careful as we might have been down for
- # a while and have an incorrect view of the current state,
+ seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids)
+ has_missing_prevs = bool(prev_event_ids - seen_event_ids)
+ if has_missing_prevs:
+ # We don't have all the prev_events of this event, which means we have a
+ # gap in the graph, and the new event is going to become a new backwards
+ # extremity.
+ #
+ # In this case we want to be a little careful as we might have been
+ # down for a while and have an incorrect view of the current state,
# however we still want to do checks as gaps are easy to
# maliciously manufacture.
#
@@ -1702,6 +1906,7 @@ class FederationEventHandler:
event.room_id, extrem_ids
)
state_sets: List[StateMap[str]] = list(state_sets_d.values())
+ state_ids = await context.get_prev_state_ids()
state_sets.append(state_ids)
current_state_ids = (
await self._state_resolution_handler.resolve_events_with_store(
@@ -1751,7 +1956,7 @@ class FederationEventHandler:
event.internal_metadata.soft_failed = True
async def _load_or_fetch_auth_events_for_event(
- self, destination: str, event: EventBase
+ self, destination: Optional[str], event: EventBase
) -> Collection[EventBase]:
"""Fetch this event's auth_events, from database or remote
@@ -1767,12 +1972,19 @@ class FederationEventHandler:
Args:
destination: where to send the /event_auth request. Typically the server
that sent us `event` in the first place.
+
+ If this is None, no attempt is made to load any missing auth events:
+ rather, an AssertionError is raised if there are any missing events.
+
event: the event whose auth_events we want
Returns:
all of the events listed in `event.auth_events_ids`, after deduplication
Raises:
+ AssertionError if some auth events were missing and no `destination` was
+ supplied.
+
AuthError if we were unable to fetch the auth_events for any reason.
"""
event_auth_event_ids = set(event.auth_event_ids())
@@ -1784,6 +1996,13 @@ class FederationEventHandler:
)
if not missing_auth_event_ids:
return event_auth_events.values()
+ if destination is None:
+ # this shouldn't happen: destination must be set unless we know we have already
+ # persisted the auth events.
+ raise AssertionError(
+ "_load_or_fetch_auth_events_for_event() called with no destination for "
+ "an event with missing auth_events"
+ )
logger.info(
"Event %s refers to unknown auth events %s: fetching auth chain",
@@ -1819,6 +2038,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+ @trace
+ @tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1847,6 +2068,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
+ @trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -1955,8 +2177,17 @@ class FederationEventHandler:
self._message_handler.maybe_schedule_expiry(event)
if not backfilled: # Never notify for backfilled events
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
+ with start_active_span("notify_persisted_events"):
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids",
+ str([ev.event_id for ev in events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ for event in events:
+ await self._notify_persisted_event(event, max_stream_token)
return max_stream_token.stream
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 85b472f250..860c82c110 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -143,8 +143,8 @@ class InitialSyncHandler:
joined_rooms,
to_key=int(now_token.receipt_key),
)
- if self.hs.config.experimental.msc2285_enabled:
- receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
+
+ receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -309,18 +309,18 @@ class InitialSyncHandler:
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
- user_id = requester.user.to_string()
-
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id,
- user_id,
+ requester,
allow_departed_users=True,
)
is_peeking = member_event_id is None
+ user_id = requester.user.to_string()
+
if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
@@ -456,11 +456,8 @@ class InitialSyncHandler:
)
if not receipts:
return []
- if self.hs.config.experimental.msc2285_enabled:
- receipts = ReceiptEventSource.filter_out_private_receipts(
- receipts, user_id
- )
- return receipts
+
+ return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
presence, receipts, (messages, token) = await make_deferred_yieldable(
gather_results(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bd7baef051..acd3de06f6 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -41,6 +41,7 @@ from synapse.api.errors import (
NotFoundError,
ShadowBanError,
SynapseError,
+ UnstableSpecAuthError,
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -51,6 +52,7 @@ from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
+from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
@@ -102,7 +104,7 @@ class MessageHandler:
async def get_room_data(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
event_type: str,
state_key: str,
@@ -110,7 +112,7 @@ class MessageHandler:
"""Get data from a room.
Args:
- user_id
+ requester: The user who did the request.
room_id
event_type
state_key
@@ -123,7 +125,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -149,17 +151,20 @@ class MessageHandler:
"Attempted to retrieve data from a room for a user that has never been in it. "
"This should not have happened."
)
- raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
+ raise UnstableSpecAuthError(
+ 403,
+ "User not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return data
async def get_state_events(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
state_filter: Optional[StateFilter] = None,
at_token: Optional[StreamToken] = None,
- is_guest: bool = False,
) -> List[dict]:
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -168,14 +173,13 @@ class MessageHandler:
visible.
Args:
- user_id: The user requesting state events.
+ requester: The user requesting state events.
room_id: The room ID to get all state events from.
state_filter: The state filter used to fetch state from the database.
at_token: the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
- is_guest: whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
@@ -185,6 +189,7 @@ class MessageHandler:
members of this room.
"""
state_filter = state_filter or StateFilter.all()
+ user_id = requester.user.to_string()
if at_token:
last_event_id = (
@@ -217,7 +222,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -311,30 +316,42 @@ class MessageHandler:
Returns:
A dict of user_id to profile info
"""
- user_id = requester.user.to_string()
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership != Membership.JOIN:
- raise NotImplementedError(
- "Getting joined members after leaving is not implemented"
+ raise SynapseError(
+ code=403,
+ errcode=Codes.FORBIDDEN,
+ msg="Getting joined members while not being a current member of the room is forbidden.",
)
- users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
+ users_with_profile = (
+ await self._state_storage_controller.get_users_in_room_with_profiles(
+ room_id
+ )
+ )
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if (
+ requester.app_service
+ and requester.user.to_string() not in users_with_profile
+ ):
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
else:
# Loop fell through, AS has no interested users in room
- raise AuthError(403, "Appservice not in room")
+ raise UnstableSpecAuthError(
+ 403,
+ "Appservice not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return {
user_id: {
@@ -1135,6 +1152,10 @@ class EventCreationHandler:
context = await self.state.compute_event_context(
event,
state_ids_before_event=state_map_for_event,
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ partial_state=False,
)
else:
context = await self.state.compute_event_context(event)
@@ -1359,9 +1380,10 @@ class EventCreationHandler:
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
- await self._bulk_push_rule_evaluator.action_for_event_by_user(
- event, context
- )
+ with opentracing.start_active_span("calculate_push_actions"):
+ await self._bulk_push_rule_evaluator.action_for_event_by_user(
+ event, context
+ )
try:
# If we're a worker we need to hit out to the master.
@@ -1448,9 +1470,10 @@ class EventCreationHandler:
state = await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
)
- joined_hosts = await self.store.get_joined_hosts(
- event.room_id, state, state_entry
- )
+ with opentracing.start_active_span("get_joined_hosts"):
+ joined_hosts = await self.store.get_joined_hosts(
+ event.room_id, state, state_entry
+ )
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 6262a35822..74e944bce7 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -24,6 +24,7 @@ from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
+from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
@@ -416,6 +417,7 @@ class PaginationHandler:
await self._storage_controllers.purge_events.purge_room(room_id)
+ @trace
async def get_messages(
self,
requester: Requester,
@@ -462,7 +464,7 @@ class PaginationHandler:
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if pagin_config.direction == "b":
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 895ea63ed3..741504ba9f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,6 @@ from typing import (
Callable,
Collection,
Dict,
- FrozenSet,
Generator,
Iterable,
List,
@@ -42,7 +41,6 @@ from typing import (
Set,
Tuple,
Type,
- Union,
)
from prometheus_client import Counter
@@ -68,7 +66,6 @@ from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -1656,15 +1653,18 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# doesn't return. C.f. #5503.
return [], max_token
- # Figure out which other users this user should receive updates for
- users_interested_in = await self._get_interested_in(user, explicit_room_id)
+ # Figure out which other users this user should explicitly receive
+ # updates for
+ additional_users_interested_in = (
+ await self.get_presence_router().get_interested_users(user.to_string())
+ )
# We have a set of users that we're interested in the presence of. We want to
# cross-reference that with the users that have actually changed their presence.
# Check whether this user should see all user updates
- if users_interested_in == PresenceRouter.ALL_USERS:
+ if additional_users_interested_in == PresenceRouter.ALL_USERS:
# Provide presence state for all users
presence_updates = await self._filter_all_presence_updates_for_user(
user_id, include_offline, from_key
@@ -1673,34 +1673,47 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
return presence_updates, max_token
# Make mypy happy. users_interested_in should now be a set
- assert not isinstance(users_interested_in, str)
+ assert not isinstance(additional_users_interested_in, str)
+
+ # We always care about our own presence.
+ additional_users_interested_in.add(user_id)
+
+ if explicit_room_id:
+ user_ids = await self.store.get_users_in_room(explicit_room_id)
+ additional_users_interested_in.update(user_ids)
# The set of users that we're interested in and that have had a presence update.
# We'll actually pull the presence updates for these users at the end.
- interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set()
+ interested_and_updated_users: Collection[str]
if from_key is not None:
# First get all users that have had a presence update
updated_users = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
- # Use a slightly-optimised method for processing smaller sets of updates.
- if updated_users is not None and len(updated_users) < 500:
- # For small deltas, it's quicker to get all changes and then
- # cross-reference with the users we're interested in
+ if updated_users is not None:
+ # If we have the full list of changes for presence we can
+ # simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
- for other_user_id in updated_users:
- if other_user_id in users_interested_in:
- # mypy thinks this variable could be a FrozenSet as it's possibly set
- # to one in the `get_entities_changed` call below, and `add()` is not
- # method on a FrozenSet. That doesn't affect us here though, as
- # `interested_and_updated_users` is clearly a set() above.
- interested_and_updated_users.add(other_user_id) # type: ignore
+
+ sharing_users = await self.store.do_users_share_a_room(
+ user_id, updated_users
+ )
+
+ interested_and_updated_users = (
+ sharing_users.union(additional_users_interested_in)
+ ).intersection(updated_users)
+
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
+ users_interested_in = (
+ await self.store.get_users_who_share_room_with_user(user_id)
+ )
+ users_interested_in.update(additional_users_interested_in)
+
interested_and_updated_users = (
stream_change_cache.get_entities_changed(
users_interested_in, from_key
@@ -1709,7 +1722,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
else:
# No from_key has been specified. Return the presence for all users
# this user is interested in
- interested_and_updated_users = users_interested_in
+ interested_and_updated_users = (
+ await self.store.get_users_who_share_room_with_user(user_id)
+ )
+ interested_and_updated_users.update(additional_users_interested_in)
# Retrieve the current presence state for each user
users_to_state = await self.get_presence_handler().current_state_for_users(
@@ -1804,62 +1820,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def get_current_key(self) -> int:
return self.store.get_current_presence_token()
- @cached(num_args=2, cache_context=True)
- async def _get_interested_in(
- self,
- user: UserID,
- explicit_room_id: Optional[str] = None,
- cache_context: Optional[_CacheContext] = None,
- ) -> Union[Set[str], str]:
- """Returns the set of users that the given user should see presence
- updates for.
-
- Args:
- user: The user to retrieve presence updates for.
- explicit_room_id: The users that are in the room will be returned.
-
- Returns:
- A set of user IDs to return presence updates for, or "ALL" to return all
- known updates.
- """
- user_id = user.to_string()
- users_interested_in = set()
- users_interested_in.add(user_id) # So that we receive our own presence
-
- # cache_context isn't likely to ever be None due to the @cached decorator,
- # but we can't have a non-optional argument after the optional argument
- # explicit_room_id either. Assert cache_context is not None so we can use it
- # without mypy complaining.
- assert cache_context
-
- # Check with the presence router whether we should poll additional users for
- # their presence information
- additional_users = await self.get_presence_router().get_interested_users(
- user.to_string()
- )
- if additional_users == PresenceRouter.ALL_USERS:
- # If the module requested that this user see the presence updates of *all*
- # users, then simply return that instead of calculating what rooms this
- # user shares
- return PresenceRouter.ALL_USERS
-
- # Add the additional users from the router
- users_interested_in.update(additional_users)
-
- # Find the users who share a room with this user
- users_who_share_room = await self.store.get_users_who_share_room_with_user(
- user_id, on_invalidate=cache_context.invalidate
- )
- users_interested_in.update(users_who_share_room)
-
- if explicit_room_id:
- user_ids = await self.store.get_users_in_room(
- explicit_room_id, on_invalidate=cache_context.invalidate
- )
- users_interested_in.update(user_ids)
-
- return users_interested_in
-
def handle_timeouts(
user_states: List[UserPresenceState],
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 43d2882b0a..d4a866b346 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -163,7 +163,10 @@ class ReceiptsHandler:
if not is_new:
return
- if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE:
+ if self.federation_sender and receipt_type not in (
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ):
await self.federation_sender.send_read_receipt(receipt)
@@ -203,24 +206,38 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
for event_id, orig_event_content in room.get("content", {}).items():
event_content = orig_event_content
# If there are private read receipts, additional logic is necessary.
- if ReceiptTypes.READ_PRIVATE in event_content:
+ if (
+ ReceiptTypes.READ_PRIVATE in event_content
+ or ReceiptTypes.UNSTABLE_READ_PRIVATE in event_content
+ ):
# Make a copy without private read receipts to avoid leaking
# other user's private read receipts..
event_content = {
receipt_type: receipt_value
for receipt_type, receipt_value in event_content.items()
- if receipt_type != ReceiptTypes.READ_PRIVATE
+ if receipt_type
+ not in (
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ )
}
# Copy the current user's private read receipt from the
# original content, if it exists.
- user_private_read_receipt = orig_event_content[
- ReceiptTypes.READ_PRIVATE
- ].get(user_id, None)
+ user_private_read_receipt = orig_event_content.get(
+ ReceiptTypes.READ_PRIVATE, {}
+ ).get(user_id, None)
if user_private_read_receipt:
event_content[ReceiptTypes.READ_PRIVATE] = {
user_id: user_private_read_receipt
}
+ user_unstable_private_read_receipt = orig_event_content.get(
+ ReceiptTypes.UNSTABLE_READ_PRIVATE, {}
+ ).get(user_id, None)
+ if user_unstable_private_read_receipt:
+ event_content[ReceiptTypes.UNSTABLE_READ_PRIVATE] = {
+ user_id: user_unstable_private_read_receipt
+ }
# Include the event if there is at least one non-private read
# receipt or the current user has a private read receipt.
@@ -256,10 +273,9 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
room_ids, from_key=from_key, to_key=to_key
)
- if self.config.experimental.msc2285_enabled:
- events = ReceiptEventSource.filter_out_private_receipts(
- events, user.to_string()
- )
+ events = ReceiptEventSource.filter_out_private_receipts(
+ events, user.to_string()
+ )
return events, to_key
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c77d181722..20ec22105a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -29,7 +29,13 @@ from synapse.api.constants import (
JoinRules,
LoginType,
)
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ ConsentNotGivenError,
+ InvalidClientTokenError,
+ SynapseError,
+)
from synapse.appservice import ApplicationService
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
@@ -180,10 +186,7 @@ class RegistrationHandler:
)
if guest_access_token:
user_data = await self.auth.get_user_by_access_token(guest_access_token)
- if (
- not user_data.is_guest
- or UserID.from_string(user_data.user_id).localpart != localpart
- ):
+ if not user_data.is_guest or user_data.user.localpart != localpart:
raise AuthError(
403,
"Cannot register taken user ID without valid guest "
@@ -618,7 +621,7 @@ class RegistrationHandler:
user_id = user.to_string()
service = self.store.get_app_service_by_token(as_token)
if not service:
- raise AuthError(403, "Invalid application service token.")
+ raise InvalidClientTokenError()
if not service.is_interested_in_user(user_id):
raise SynapseError(
400,
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 0b63cd2186..28d7093f08 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -19,6 +19,7 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
+from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import _RelatedEvent
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.visibility import filter_events_for_client
@@ -73,7 +74,6 @@ class RelationsHandler:
room_id: str,
relation_type: Optional[str] = None,
event_type: Optional[str] = None,
- aggregation_key: Optional[str] = None,
limit: int = 5,
direction: str = "b",
from_token: Optional[StreamToken] = None,
@@ -89,7 +89,6 @@ class RelationsHandler:
room_id: The room the event belongs to.
relation_type: Only fetch events with this relation type, if given.
event_type: Only fetch events with this event type, if given.
- aggregation_key: Only fetch events with this aggregation key, if given.
limit: Only fetch the most recent `limit` events.
direction: Whether to fetch the most recent first (`"b"`) or the
oldest first (`"f"`).
@@ -104,7 +103,7 @@ class RelationsHandler:
# TODO Properly handle a user leaving a room.
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
# This gets the original event and checks that a) the event exists and
@@ -122,7 +121,6 @@ class RelationsHandler:
room_id=room_id,
relation_type=relation_type,
event_type=event_type,
- aggregation_key=aggregation_key,
limit=limit,
direction=direction,
from_token=from_token,
@@ -364,6 +362,7 @@ class RelationsHandler:
return results
+ @trace
async def get_bundled_aggregations(
self, events: Iterable[EventBase], user_id: str
) -> Dict[str, BundledAggregations]:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 978d3ee39f..2bf0ebd025 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -721,7 +721,7 @@ class RoomCreationHandler:
# allow the server notices mxid to create rooms
is_requester_admin = True
else:
- is_requester_admin = await self.auth.is_server_admin(requester.user)
+ is_requester_admin = await self.auth.is_server_admin(requester)
# Let the third party rules modify the room creation config if needed, or abort
# the room creation entirely with an exception.
@@ -1279,7 +1279,7 @@ class RoomContextHandler:
"""
user = requester.user
if use_admin_priviledge:
- await assert_user_is_admin(self.auth, requester.user)
+ await assert_user_is_admin(self.auth, requester)
before_limit = math.floor(limit / 2.0)
after_limit = limit - before_limit
@@ -1384,6 +1384,7 @@ class TimestampLookupHandler:
self.store = hs.get_datastores().main
self.state_handler = hs.get_state_handler()
self.federation_client = hs.get_federation_client()
+ self.federation_event_handler = hs.get_federation_event_handler()
self._storage_controllers = hs.get_storage_controllers()
async def get_event_for_timestamp(
@@ -1479,38 +1480,68 @@ class TimestampLookupHandler:
remote_response,
)
- # TODO: Do we want to persist this as an extremity?
- # TODO: I think ideally, we would try to backfill from
- # this event and run this whole
- # `get_event_for_timestamp` function again to make sure
- # they didn't give us an event from their gappy history.
remote_event_id = remote_response.event_id
- origin_server_ts = remote_response.origin_server_ts
+ remote_origin_server_ts = remote_response.origin_server_ts
+
+ # Backfill this event so we can get a pagination token for
+ # it with `/context` and paginate `/messages` from this
+ # point.
+ #
+ # TODO: The requested timestamp may lie in a part of the
+ # event graph that the remote server *also* didn't have,
+ # in which case they will have returned another event
+ # which may be nowhere near the requested timestamp. In
+ # the future, we may need to reconcile that gap and ask
+ # other homeservers, and/or extend `/timestamp_to_event`
+ # to return events on *both* sides of the timestamp to
+ # help reconcile the gap faster.
+ remote_event = (
+ await self.federation_event_handler.backfill_event_id(
+ domain, room_id, remote_event_id
+ )
+ )
+
+ # XXX: When we see that the remote server is not trustworthy,
+ # maybe we should not ask them first in the future.
+ if remote_origin_server_ts != remote_event.origin_server_ts:
+ logger.info(
+ "get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
+ domain,
+ remote_event_id,
+ remote_origin_server_ts,
+ remote_event.origin_server_ts,
+ )
# Only return the remote event if it's closer than the local event
if not local_event or (
- abs(origin_server_ts - timestamp)
+ abs(remote_event.origin_server_ts - timestamp)
< abs(local_event.origin_server_ts - timestamp)
):
- return remote_event_id, origin_server_ts
+ logger.info(
+ "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
+ remote_event_id,
+ remote_event.origin_server_ts,
+ timestamp,
+ local_event.event_id if local_event else None,
+ local_event.origin_server_ts if local_event else None,
+ )
+ return remote_event_id, remote_origin_server_ts
except (HttpResponseException, InvalidResponseError) as ex:
# Let's not put a high priority on some other homeserver
# failing to respond or giving a random response
logger.debug(
- "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
domain,
type(ex).__name__,
ex,
ex.args,
)
- except Exception as ex:
+ except Exception:
# But we do want to see some exceptions in our code
logger.warning(
- "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
+ "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
domain,
- type(ex).__name__,
- ex,
- ex.args,
+ exc_info=True,
)
# To appease mypy, we have to add both of these conditions to check for
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 29868eb743..bb0bdb8e6f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -182,7 +182,7 @@ class RoomListHandler:
== HistoryVisibility.WORLD_READABLE,
"guest_can_join": room["guest_access"] == "can_join",
"join_rule": room["join_rules"],
- "org.matrix.msc3827.room_type": room["room_type"],
+ "room_type": room["room_type"],
}
# Filter out Nones – rather omit the field altogether
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ef2fa6bb6f..9c0fdeca15 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -32,6 +32,7 @@ from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -178,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""Try and join a room that this server is not in
Args:
- requester
+ requester: The user making the request, according to the access token.
remote_room_hosts: List of servers that can be used to join via.
room_id: Room that we are trying to join
user: User who is trying to join
@@ -430,14 +431,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)
-
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- event,
- context,
- extra_users=[target],
- ratelimit=ratelimit,
- )
+ with opentracing.start_active_span("handle_new_client_event"):
+ result_event = await self.event_creation_handler.handle_new_client_event(
+ requester,
+ event,
+ context,
+ extra_users=[target],
+ ratelimit=ratelimit,
+ )
if event.membership == Membership.LEAVE:
if prev_member_event_id:
@@ -566,25 +567,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
- result = await self.update_membership_locked(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- new_room=new_room,
- require_consent=require_consent,
- outlier=outlier,
- historical=historical,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- )
+ with opentracing.start_active_span("update_membership_locked"):
+ result = await self.update_membership_locked(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ new_room=new_room,
+ require_consent=require_consent,
+ outlier=outlier,
+ historical=historical,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ )
return result
@@ -651,6 +653,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
Returns:
A tuple of the new event ID and stream ID.
"""
+
content_specified = bool(content)
if content is None:
content = {}
@@ -688,7 +691,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
errcode=Codes.BAD_JSON,
)
- if "avatar_url" in content:
+ if "avatar_url" in content and content.get("avatar_url") is not None:
if not await self.profile_handler.check_avatar_size_and_mime_type(
content["avatar_url"],
):
@@ -743,7 +746,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
is_requester_admin = True
else:
- is_requester_admin = await self.auth.is_server_admin(requester.user)
+ is_requester_admin = await self.auth.is_server_admin(requester)
if not is_requester_admin:
if self.config.server.block_non_admin_invites:
@@ -878,7 +881,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
bypass_spam_checker = True
else:
- bypass_spam_checker = await self.auth.is_server_admin(requester.user)
+ bypass_spam_checker = await self.auth.is_server_admin(requester)
inviter = await self._get_inviter(target.to_string(), room_id)
if (
@@ -1438,7 +1441,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ShadowBanError if the requester has been shadow-banned.
"""
if self.config.server.block_non_admin_invites:
- is_requester_admin = await self.auth.is_server_admin(requester.user)
+ is_requester_admin = await self.auth.is_server_admin(requester)
if not is_requester_admin:
raise SynapseError(
403, "Invites have been disabled on this server", Codes.FORBIDDEN
@@ -1710,14 +1713,18 @@ class RoomMemberMasterHandler(RoomMemberHandler):
]
if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
+ raise SynapseError(
+ 404,
+ "Can't join remote room because no servers "
+ "that are in the room have been provided.",
+ )
check_complexity = self.hs.config.server.limit_remote_rooms.enabled
if (
check_complexity
and self.hs.config.server.limit_remote_rooms.admins_can_join
):
- check_complexity = not await self.auth.is_server_admin(user)
+ check_complexity = not await self.store.is_server_admin(user)
if check_complexity:
# Fetch the room complexity
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 13098f56ed..732b0310bc 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -28,11 +28,11 @@ from synapse.api.constants import (
RoomTypes,
)
from synapse.api.errors import (
- AuthError,
Codes,
NotFoundError,
StoreError,
SynapseError,
+ UnstableSpecAuthError,
UnsupportedRoomVersionError,
)
from synapse.api.ratelimiting import Ratelimiter
@@ -175,10 +175,11 @@ class RoomSummaryHandler:
# First of all, check that the room is accessible.
if not await self._is_local_room_accessible(requested_room_id, requester):
- raise AuthError(
+ raise UnstableSpecAuthError(
403,
"User %s not in room %s, and room previews are disabled"
% (requester, requested_room_id),
+ errcode=Codes.NOT_JOINED,
)
# If this is continuing a previous session, pull the persisted data.
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index a305a66860..e2844799e8 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -23,10 +23,12 @@ from pkg_resources import parse_version
import twisted
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorTCP
+from twisted.internet.interfaces import IOpenSSLContextFactory
+from twisted.internet.ssl import optionsForClientTLS
from twisted.mail.smtp import ESMTPSender, ESMTPSenderFactory
from synapse.logging.context import make_deferred_yieldable
+from synapse.types import ISynapseReactor
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -48,7 +50,7 @@ class _NoTLSESMTPSender(ESMTPSender):
async def _sendmail(
- reactor: IReactorTCP,
+ reactor: ISynapseReactor,
smtphost: str,
smtpport: int,
from_addr: str,
@@ -59,6 +61,7 @@ async def _sendmail(
require_auth: bool = False,
require_tls: bool = False,
enable_tls: bool = True,
+ force_tls: bool = False,
) -> None:
"""A simple wrapper around ESMTPSenderFactory, to allow substitution in tests
@@ -73,8 +76,9 @@ async def _sendmail(
password: password to give when authenticating
require_auth: if auth is not offered, fail the request
require_tls: if TLS is not offered, fail the reqest
- enable_tls: True to enable TLS. If this is False and require_tls is True,
+ enable_tls: True to enable STARTTLS. If this is False and require_tls is True,
the request will fail.
+ force_tls: True to enable Implicit TLS.
"""
msg = BytesIO(msg_bytes)
d: "Deferred[object]" = Deferred()
@@ -105,13 +109,23 @@ async def _sendmail(
# set to enable TLS.
factory = build_sender_factory(hostname=smtphost if enable_tls else None)
- reactor.connectTCP(
- smtphost,
- smtpport,
- factory,
- timeout=30,
- bindAddress=None,
- )
+ if force_tls:
+ reactor.connectSSL(
+ smtphost,
+ smtpport,
+ factory,
+ optionsForClientTLS(smtphost),
+ timeout=30,
+ bindAddress=None,
+ )
+ else:
+ reactor.connectTCP(
+ smtphost,
+ smtpport,
+ factory,
+ timeout=30,
+ bindAddress=None,
+ )
await make_deferred_yieldable(d)
@@ -132,6 +146,7 @@ class SendEmailHandler:
self._smtp_pass = passwd.encode("utf-8") if passwd is not None else None
self._require_transport_security = hs.config.email.require_transport_security
self._enable_tls = hs.config.email.enable_smtp_tls
+ self._force_tls = hs.config.email.force_tls
self._sendmail = _sendmail
@@ -189,4 +204,5 @@ class SendEmailHandler:
require_auth=self._smtp_user is not None,
require_tls=self._require_transport_security,
enable_tls=self._enable_tls,
+ force_tls=self._force_tls,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d42a414c90..2d95b1fa24 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,19 @@
# limitations under the License.
import itertools
import logging
-from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Collection,
+ Dict,
+ FrozenSet,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+)
import attr
from prometheus_client import Counter
@@ -89,7 +101,7 @@ class SyncConfig:
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TimelineBatch:
prev_batch: StreamToken
- events: List[EventBase]
+ events: Sequence[EventBase]
limited: bool
# A mapping of event ID to the bundled aggregations for the above events.
# This is only calculated if limited is true.
@@ -507,10 +519,17 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
+ # FIXME(faster_joins): We use the partial state here as
+ # we don't want to block `/sync` on finishing a lazy join.
+ # Which should be fine once
+ # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+ # since we shouldn't reach here anymore?
+ # Note that we use the current state as a whitelist for filtering
+ # `recents`, so partial state is only a problem when a membership
+ # event turns up in `recents` but has not made it into the current
+ # state.
current_state_ids_map = (
- await self._state_storage_controller.get_current_state_ids(
- room_id
- )
+ await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -579,7 +598,13 @@ class SyncHandler:
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
- # Is this the correct way of doing it?
+ # Which should be fine once
+ # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+ # since we shouldn't reach here anymore?
+ # Note that we use the current state as a whitelist for filtering
+ # `loaded_recents`, so partial state is only a problem when a
+ # membership event turns up in `loaded_recents` but has not made it
+ # into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
@@ -627,7 +652,10 @@ class SyncHandler:
)
async def get_state_after_event(
- self, event_id: str, state_filter: Optional[StateFilter] = None
+ self,
+ event_id: str,
+ state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event
@@ -635,9 +663,14 @@ class SyncHandler:
Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at the event and `state_filter` is not satisfied by partial state.
+ Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
- event_id, state_filter=state_filter or StateFilter.all()
+ event_id,
+ state_filter=state_filter or StateFilter.all(),
+ await_full_state=await_full_state,
)
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -660,6 +693,7 @@ class SyncHandler:
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position
@@ -667,6 +701,9 @@ class SyncHandler:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at the last event in the room before `stream_position` and
+ `state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
@@ -678,7 +715,9 @@ class SyncHandler:
if last_event_id:
state = await self.get_state_after_event(
- last_event_id, state_filter=state_filter or StateFilter.all()
+ last_event_id,
+ state_filter=state_filter or StateFilter.all(),
+ await_full_state=await_full_state,
)
else:
@@ -852,16 +891,26 @@ class SyncHandler:
now_token: StreamToken,
full_state: bool,
) -> MutableStateMap[EventBase]:
- """Works out the difference in state between the start of the timeline
- and the previous sync.
+ """Works out the difference in state between the end of the previous sync and
+ the start of the timeline.
Args:
room_id:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
- since_token: Token of the end of the previous batch. May be None.
+ since_token: Token of the end of the previous batch. May be `None`.
now_token: Token of the end of the current batch.
full_state: Whether to force returning the full state.
+ `lazy_load_members` still applies when `full_state` is `True`.
+
+ Returns:
+ The state to return in the sync response for the room.
+
+ Clients will overlay this onto the state at the end of the previous sync to
+ arrive at the state at the start of the timeline.
+
+ Clients will then overlay state events in the timeline to arrive at the
+ state at the end of the timeline, in preparation for the next sync.
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -869,8 +918,17 @@ class SyncHandler:
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+ # The memberships needed for events in the timeline.
+ # Only calculated when `lazy_load_members` is on.
+ members_to_fetch: Optional[Set[str]] = None
+
+ # A dictionary mapping user IDs to the first event in the timeline sent by
+ # them. Only calculated when `lazy_load_members` is on.
+ first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
- members_to_fetch = None
+ # The contribution to the room state from state events in the timeline.
+ # Only contains the last event for any given state key.
+ timeline_state: StateMap[str]
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -881,10 +939,23 @@ class SyncHandler:
# We only request state for the members needed to display the
# timeline:
- members_to_fetch = {
- event.sender # FIXME: we also care about invite targets etc.
- for event in batch.events
- }
+ timeline_state = {}
+
+ members_to_fetch = set()
+ first_event_by_sender_map = {}
+ for event in batch.events:
+ # Build the map from user IDs to the first timeline event they sent.
+ if event.sender not in first_event_by_sender_map:
+ first_event_by_sender_map[event.sender] = event
+
+ # We need the event's sender, unless their membership was in a
+ # previous timeline event.
+ if (EventTypes.Member, event.sender) not in timeline_state:
+ members_to_fetch.add(event.sender)
+ # FIXME: we also care about invite targets etc.
+
+ if event.is_state():
+ timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
@@ -894,55 +965,80 @@ class SyncHandler:
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+
+ # We are happy to use partial state to compute the `/sync` response.
+ # Since partial state may not include the lazy-loaded memberships we
+ # require, we fix up the state response afterwards with memberships from
+ # auth events.
+ await_full_state = False
else:
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events
+ if event.is_state()
+ }
+
state_filter = StateFilter.all()
+ await_full_state = True
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events
- if event.is_state()
- }
+ # Now calculate the state to return in the sync response for the room.
+ # This is more or less the change in state between the end of the previous
+ # sync's timeline and the start of the current sync's timeline.
+ # See the docstring above for details.
+ state_ids: StateMap[str]
if full_state:
if batch:
- current_state_ids = (
+ state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
- state_ids = (
+ state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
- current_state_ids = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
- state_ids = current_state_ids
+ state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state(
timeline_contains=timeline_state,
- timeline_start=state_ids,
- previous={},
- current=current_state_ids,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end={},
lazy_load_members=lazy_load_members,
)
elif batch.limited:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
@@ -964,28 +1060,35 @@ class SyncHandler:
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
- room_id, stream_position=since_token, state_filter=state_filter
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
if batch:
- current_state_ids = (
+ state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
- # Its not clear how we get here, but empirically we do
- # (#5407). Logging has been added elsewhere to try and
- # figure out where this state comes from.
- current_state_ids = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
- previous=state_at_previous_sync,
- current=current_state_ids,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end=state_at_previous_sync,
# we have to include LL members in case LL initial sync missed them
lazy_load_members=lazy_load_members,
)
@@ -1008,8 +1111,30 @@ class SyncHandler:
(EventTypes.Member, member)
for member in members_to_fetch
),
+ await_full_state=False,
)
+ # If we only have partial state for the room, `state_ids` may be missing the
+ # memberships we wanted. We attempt to find some by digging through the auth
+ # events of timeline events.
+ if lazy_load_members and await self.store.is_partial_state_room(room_id):
+ assert members_to_fetch is not None
+ assert first_event_by_sender_map is not None
+
+ additional_state_ids = (
+ await self._find_missing_partial_state_memberships(
+ room_id, members_to_fetch, first_event_by_sender_map, state_ids
+ )
+ )
+ state_ids = {**state_ids, **additional_state_ids}
+
+ # At this point, if `lazy_load_members` is enabled, `state_ids` includes
+ # the memberships of all event senders in the timeline. This is because we
+ # may not have sent the memberships in a previous sync.
+
+ # When `include_redundant_members` is on, we send all the lazy-loaded
+ # memberships of event senders. Otherwise we make an effort to limit the set
+ # of memberships we send to those that we have not already sent to this client.
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
@@ -1051,6 +1176,99 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
+ async def _find_missing_partial_state_memberships(
+ self,
+ room_id: str,
+ members_to_fetch: Collection[str],
+ events_with_membership_auth: Mapping[str, EventBase],
+ found_state_ids: StateMap[str],
+ ) -> StateMap[str]:
+ """Finds missing memberships from a set of auth events and returns them as a
+ state map.
+
+ Args:
+ room_id: The partial state room to find the remaining memberships for.
+ members_to_fetch: The memberships to find.
+ events_with_membership_auth: A mapping from user IDs to events whose auth
+ events are known to contain their membership.
+ found_state_ids: A dict from (type, state_key) -> state_event_id, containing
+ memberships that have been previously found. Entries in
+ `members_to_fetch` that have a membership in `found_state_ids` are
+ ignored.
+
+ Returns:
+ A dict from ("m.room.member", state_key) -> state_event_id, containing the
+ memberships missing from `found_state_ids`.
+
+ Raises:
+ KeyError: if `events_with_membership_auth` does not have an entry for a
+ missing membership. Memberships in `found_state_ids` do not need an
+ entry in `events_with_membership_auth`.
+ """
+ additional_state_ids: MutableStateMap[str] = {}
+
+ # Tracks the missing members for logging purposes.
+ missing_members = set()
+
+ # Identify memberships missing from `found_state_ids` and pick out the auth
+ # events in which to look for them.
+ auth_event_ids: Set[str] = set()
+ for member in members_to_fetch:
+ if (EventTypes.Member, member) in found_state_ids:
+ continue
+
+ missing_members.add(member)
+ event_with_membership_auth = events_with_membership_auth[member]
+ auth_event_ids.update(event_with_membership_auth.auth_event_ids())
+
+ auth_events = await self.store.get_events(auth_event_ids)
+
+ # Run through the missing memberships once more, picking out the memberships
+ # from the pile of auth events we have just fetched.
+ for member in members_to_fetch:
+ if (EventTypes.Member, member) in found_state_ids:
+ continue
+
+ event_with_membership_auth = events_with_membership_auth[member]
+
+ # Dig through the auth events to find the desired membership.
+ for auth_event_id in event_with_membership_auth.auth_event_ids():
+ # We only store events once we have all their auth events,
+ # so the auth event must be in the pile we have just
+ # fetched.
+ auth_event = auth_events[auth_event_id]
+
+ if (
+ auth_event.type == EventTypes.Member
+ and auth_event.state_key == member
+ ):
+ missing_members.remove(member)
+ additional_state_ids[
+ (EventTypes.Member, member)
+ ] = auth_event.event_id
+ break
+
+ if missing_members:
+ # There really shouldn't be any missing memberships now. Either:
+ # * we couldn't find an auth event, which shouldn't happen because we do
+ # not persist events with persisting their auth events first, or
+ # * the set of auth events did not contain a membership we wanted, which
+ # means our caller didn't compute the events in `members_to_fetch`
+ # correctly, or we somehow accepted an event whose auth events were
+ # dodgy.
+ logger.error(
+ "Failed to find memberships for %s in partial state room "
+ "%s in the auth events of %s.",
+ missing_members,
+ room_id,
+ [
+ events_with_membership_auth[member].event_id
+ for member in missing_members
+ ],
+ )
+
+ return additional_state_ids
+
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> NotifCounts:
@@ -1536,15 +1754,13 @@ class SyncHandler:
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
- sync_result_builder, ignored_users, self.rooms_to_exclude
+ sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
- room_changes = await self._get_all_rooms(
- sync_result_builder, ignored_users, self.rooms_to_exclude
- )
+ room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1623,13 +1839,14 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
- excluded_rooms: List[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.
This function is a first pass at generating the rooms part of the sync response.
It determines which rooms have changed during the sync period, and categorises
- them into four buckets: "knock", "invite", "join" and "leave".
+ them into four buckets: "knock", "invite", "join" and "leave". It also excludes
+ from that list any room that appears in the list of rooms to exclude from sync
+ results in the server configuration.
1. Finds all membership changes for the user in the sync period (from
`since_token` up to `now_token`).
@@ -1655,7 +1872,7 @@ class SyncHandler:
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
membership_change_events = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key, excluded_rooms
+ user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
)
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
@@ -1696,7 +1913,11 @@ class SyncHandler:
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
- old_state_ids = await self.get_state_at(room_id, since_token)
+ old_state_ids = await self.get_state_at(
+ room_id,
+ since_token,
+ state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
+ )
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
@@ -1722,7 +1943,13 @@ class SyncHandler:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
- old_state_ids = await self.get_state_at(room_id, since_token)
+ old_state_ids = await self.get_state_at(
+ room_id,
+ since_token,
+ state_filter=StateFilter.from_types(
+ [(EventTypes.Member, user_id)]
+ ),
+ )
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
@@ -1862,7 +2089,6 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
- ignored_rooms: List[str],
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
@@ -1884,7 +2110,7 @@ class SyncHandler:
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
- excluded_rooms=ignored_rooms,
+ excluded_rooms=self.rooms_to_exclude,
)
room_entries = []
@@ -2150,7 +2376,9 @@ class SyncHandler:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
- self, user_id: str, room_key: RoomStreamToken
+ self,
+ user_id: str,
+ room_key: RoomStreamToken,
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
@@ -2176,7 +2404,12 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
+ # We also need to check whether the room should be excluded from sync
+ # responses as per the homeserver config.
for joined_room in joined_rooms:
+ if joined_room.room_id in self.rooms_to_exclude:
+ continue
+
if not joined_room.event_pos.persisted_after(room_key):
joined_room_ids.add(joined_room.room_id)
continue
@@ -2188,10 +2421,10 @@ class SyncHandler:
joined_room.room_id, joined_room.event_pos.stream
)
)
- users_in_room = await self.state.get_current_users_in_room(
+ user_ids_in_room = await self.state.get_current_user_ids_in_room(
joined_room.room_id, extrems
)
- if user_id in users_in_room:
+ if user_id in user_ids_in_room:
joined_room_ids.add(joined_room.room_id)
return frozenset(joined_room_ids)
@@ -2211,8 +2444,8 @@ def _action_has_highlight(actions: List[JsonDict]) -> bool:
def _calculate_state(
timeline_contains: StateMap[str],
timeline_start: StateMap[str],
- previous: StateMap[str],
- current: StateMap[str],
+ timeline_end: StateMap[str],
+ previous_timeline_end: StateMap[str],
lazy_load_members: bool,
) -> StateMap[str]:
"""Works out what state to include in a sync response.
@@ -2220,45 +2453,50 @@ def _calculate_state(
Args:
timeline_contains: state in the timeline
timeline_start: state at the start of the timeline
- previous: state at the end of the previous sync (or empty dict
+ timeline_end: state at the end of the timeline
+ previous_timeline_end: state at the end of the previous sync (or empty dict
if this is an initial sync)
- current: state at the end of the timeline
lazy_load_members: whether to return members from timeline_start
or not. assumes that timeline_start has already been filtered to
include only the members the client needs to know about.
"""
- event_id_to_key = {
- e: key
- for key, e in itertools.chain(
+ event_id_to_state_key = {
+ event_id: state_key
+ for state_key, event_id in itertools.chain(
timeline_contains.items(),
- previous.items(),
timeline_start.items(),
- current.items(),
+ timeline_end.items(),
+ previous_timeline_end.items(),
)
}
- c_ids = set(current.values())
- ts_ids = set(timeline_start.values())
- p_ids = set(previous.values())
- tc_ids = set(timeline_contains.values())
+ timeline_end_ids = set(timeline_end.values())
+ timeline_start_ids = set(timeline_start.values())
+ previous_timeline_end_ids = set(previous_timeline_end.values())
+ timeline_contains_ids = set(timeline_contains.values())
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
# as we may not have sent them to the client before. We find these membership
# events by filtering them out of timeline_start, which has already been filtered
# to only include membership events for the senders in the timeline.
- # In practice, we can do this by removing them from the p_ids list,
- # which is the list of relevant state we know we have already sent to the client.
+ # In practice, we can do this by removing them from the previous_timeline_end_ids
+ # list, which is the list of relevant state we know we have already sent to the
+ # client.
# see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
if lazy_load_members:
- p_ids.difference_update(
+ previous_timeline_end_ids.difference_update(
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
- state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+ state_ids = (
+ (timeline_end_ids | timeline_start_ids)
+ - previous_timeline_end_ids
+ - timeline_contains_ids
+ )
- return {event_id_to_key[e]: e for e in state_ids}
+ return {event_id_to_state_key[e]: e for e in state_ids}
@attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d104ea07fe..bcac3372a2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -253,12 +253,11 @@ class TypingWriterHandler(FollowerTypingHandler):
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
) -> None:
target_user_id = target_user.to_string()
- auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
- if target_user_id != auth_user_id:
+ if target_user != requester.user:
raise AuthError(400, "Cannot set another user's typing state")
if requester.shadow_banned:
@@ -266,7 +265,7 @@ class TypingWriterHandler(FollowerTypingHandler):
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()
- await self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, requester)
logger.debug("%s has started typing in %s", target_user_id, room_id)
@@ -289,12 +288,11 @@ class TypingWriterHandler(FollowerTypingHandler):
self, target_user: UserID, requester: Requester, room_id: str
) -> None:
target_user_id = target_user.to_string()
- auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
- if target_user_id != auth_user_id:
+ if target_user != requester.user:
raise AuthError(400, "Cannot set another user's typing state")
if requester.shadow_banned:
@@ -302,7 +300,7 @@ class TypingWriterHandler(FollowerTypingHandler):
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()
- await self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, requester)
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
@@ -489,8 +487,15 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
handler = self.get_typing_handler()
events = []
- for room_id in handler._room_serials.keys():
- if handler._room_serials[room_id] <= from_key:
+
+ # Work on a copy of things here as these may change in the handler while
+ # waiting for the AS `is_interested_in_room` call to complete.
+ # Shallow copy is safe as no nested data is present.
+ latest_room_serial = handler._latest_room_serial
+ room_serials = handler._room_serials.copy()
+
+ for room_id, serial in room_serials.items():
+ if serial <= from_key:
continue
if not await service.is_interested_in_room(room_id, self._main_store):
@@ -498,7 +503,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
+ return events, latest_room_serial
async def get_new_events(
self,
|