summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/device.py10
-rw-r--r--synapse/handlers/directory.py10
-rw-r--r--synapse/handlers/e2e_keys.py12
-rw-r--r--synapse/handlers/e2e_room_keys.py15
-rw-r--r--synapse/handlers/events.py9
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/federation_event.py21
-rw-r--r--synapse/handlers/initial_sync.py51
-rw-r--r--synapse/handlers/message.py37
-rw-r--r--synapse/handlers/pagination.py5
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/receipts.py6
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_list.py22
-rw-r--r--synapse/handlers/room_member.py9
-rw-r--r--synapse/handlers/stats.py11
-rw-r--r--synapse/handlers/sync.py74
-rw-r--r--synapse/handlers/typing.py14
-rw-r--r--synapse/handlers/user_directory.py18
21 files changed, 208 insertions, 147 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py

index 9abdad262b..7833e77e2b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -462,9 +462,9 @@ class ApplicationServicesHandler: Args: room_alias: The room alias to query. + Returns: - namedtuple: with keys "room_id" and "servers" or None if no - association can be found. + RoomAliasMapping or None if no association can be found. """ room_alias_str = room_alias.to_string() services = self.store.get_app_services() diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61607cf2ba..84724b207c 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py
@@ -997,9 +997,7 @@ class AuthHandler: # really don't want is active access_tokens without a record of the # device, so we double-check it here. if device_id is not None: - try: - await self.store.get_device(user_id, device_id) - except StoreError: + if await self.store.get_device(user_id, device_id) is None: await self.store.delete_access_token(access_token) raise StoreError(400, "Login raced against device deletion") diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 82ee11e921..7665425232 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -106,10 +106,10 @@ class DeviceWorkerHandler: Raises: errors.NotFoundError: if the device was not found """ - try: - device = await self.store.get_device(user_id, device_id) - except errors.StoreError: - raise errors.NotFoundError + device = await self.store.get_device(user_id, device_id) + if device is None: + raise errors.NotFoundError() + ips = await self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) @@ -602,6 +602,8 @@ class DeviceHandler(DeviceWorkerHandler): access_token, device_id ) old_device = await self.store.get_device(user_id, old_device_id) + if old_device is None: + raise errors.NotFoundError() await self.store.update_device(user_id, device_id, old_device["display_name"]) # can't call self.delete_device because that will clobber the # access token so call the storage layer directly diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7ee5c47fd9..082f521791 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py
@@ -278,13 +278,15 @@ class DirectoryHandler: users = await self.store.get_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} - servers = set(extra_servers) | set(servers) + servers_set = set(extra_servers) | set(servers) # If this server is in the list of servers, return it first. - if self.server_name in servers: - servers = [self.server_name] + [s for s in servers if s != self.server_name] + if self.server_name in servers_set: + servers = [self.server_name] + [ + s for s in servers_set if s != self.server_name + ] else: - servers = list(servers) + servers = list(servers_set) return {"room_id": room_id, "servers": servers} diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 60c11e3d21..14360b4e40 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -65,8 +65,12 @@ class E2eKeysHandler: else: # Only register this edu handler on master as it requires writing # device updates to the db - # - # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec + federation_registry.register_edu_handler( + "m.signing_key_update", + self._edu_updater.incoming_signing_key_update, + ) + # also handle the unstable version + # FIXME: remove this when enough servers have upgraded federation_registry.register_edu_handler( "org.matrix.signing_key_update", self._edu_updater.incoming_signing_key_update, @@ -576,7 +580,9 @@ class E2eKeysHandler: log_kv( {"message": "Did not update one_time_keys", "reason": "no keys given"} ) - fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None) + fallback_keys = keys.get("fallback_keys") or keys.get( + "org.matrix.msc2732.fallback_keys" + ) if fallback_keys and isinstance(fallback_keys, dict): log_kv( { diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 31742236a9..12614b2c5d 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py
@@ -14,7 +14,9 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Dict, Optional + +from typing_extensions import Literal from synapse.api.errors import ( Codes, @@ -24,6 +26,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.logging.opentracing import log_kv, trace +from synapse.storage.databases.main.e2e_room_keys import RoomKey from synapse.types import JsonDict from synapse.util.async_helpers import Linearizer @@ -58,7 +61,9 @@ class E2eRoomKeysHandler: version: str, room_id: Optional[str] = None, session_id: Optional[str] = None, - ) -> List[JsonDict]: + ) -> Dict[ + Literal["rooms"], Dict[str, Dict[Literal["sessions"], Dict[str, RoomKey]]] + ]: """Bulk get the E2E room keys for a given backup, optionally filtered to a given room, or a given session. See EndToEndRoomKeyStore.get_e2e_room_keys for full details. @@ -72,8 +77,8 @@ class E2eRoomKeysHandler: Raises: NotFoundError: if the backup version does not exist Returns: - A list of dicts giving the session_data and message metadata for - these room keys. + A dict giving the session_data and message metadata for these room keys. + `{"rooms": {room_id: {"sessions": {session_id: room_key}}}}` """ # we deliberately take the lock to get keys so that changing the version @@ -273,7 +278,7 @@ class E2eRoomKeysHandler: @staticmethod def _should_replace_room_key( - current_room_key: Optional[JsonDict], room_key: JsonDict + current_room_key: Optional[RoomKey], room_key: RoomKey ) -> bool: """ Determine whether to replace a given current_room_key (if any) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 32b0254c5f..1b996c420d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py
@@ -79,13 +79,14 @@ class EventStreamHandler: # thundering herds on restart. timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) - events, tokens = await self.notifier.get_events_for( + stream_result = await self.notifier.get_events_for( auth_user, pagin_config, timeout, is_guest=is_guest, explicit_room_id=room_id, ) + events = stream_result.events time_now = self.clock.time_msec() @@ -122,14 +123,12 @@ class EventStreamHandler: events, time_now, as_client_event=as_client_event, - # Don't bundle aggregations as this is a deprecated API. - bundle_aggregations=False, ) chunk = { "chunk": chunks, - "start": await tokens[0].to_string(self.store), - "end": await tokens[1].to_string(self.store), + "start": await stream_result.start_token.to_string(self.store), + "end": await stream_result.end_token.to_string(self.store), } return chunk diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1ea837d082..26b8e3f43c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -360,31 +360,34 @@ class FederationHandler: logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events) - states = await make_deferred_yieldable( + states_list = await make_deferred_yieldable( defer.gatherResults( [resolve(room_id, [e]) for e in event_ids], consumeErrors=True ) ) - # dict[str, dict[tuple, str]], a map from event_id to state map of - # event_ids. - states = dict(zip(event_ids, [s.state for s in states])) + # A map from event_id to state map of event_ids. + state_ids: Dict[str, StateMap[str]] = dict( + zip(event_ids, [s.state for s in states_list]) + ) state_map = await self.store.get_events( - [e_id for ids in states.values() for e_id in ids.values()], + [e_id for ids in state_ids.values() for e_id in ids.values()], get_prev_content=False, ) - states = { + + # A map from event_id to state map of events. + state_events: Dict[str, StateMap[EventBase]] = { key: { k: state_map[e_id] for k, e_id in state_dict.items() if e_id in state_map } - for key, state_dict in states.items() + for key, state_dict in state_ids.items() } for e_id in event_ids: - likely_extremeties_domains = get_domains_from_state(states[e_id]) + likely_extremeties_domains = get_domains_from_state(state_events[e_id]) success = await try_backfill( [ diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..11771f3c9c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -421,9 +421,6 @@ class FederationEventHandler: Raises: SynapseError if the response is in some way invalid. """ - for e in itertools.chain(auth_events, state): - e.internal_metadata.outlier = True - event_map = {e.event_id: e for e in itertools.chain(auth_events, state)} create_event = None @@ -666,7 +663,9 @@ class FederationEventHandler: logger.info("Processing pulled event %s", event) # these should not be outliers. - assert not event.internal_metadata.is_outlier() + assert ( + not event.internal_metadata.is_outlier() + ), "pulled event unexpectedly flagged as outlier" event_id = event.event_id @@ -1192,7 +1191,6 @@ class FederationEventHandler: [destination], event_id, room_version, - outlier=True, ) if event is None: logger.warning( @@ -1221,9 +1219,10 @@ class FederationEventHandler: """Persist a batch of outlier events fetched from remote servers. We first sort the events to make sure that we process each event's auth_events - before the event itself, and then auth and persist them. + before the event itself. - Notifies about the events where appropriate. + We then mark the events as outliers, persist them to the database, and, where + appropriate (eg, an invite), awake the notifier. Params: room_id: the room that the events are meant to be in (though this has @@ -1274,7 +1273,8 @@ class FederationEventHandler: Persists a batch of events where we have (theoretically) already persisted all of their auth events. - Notifies about the events where appropriate. + Marks the events as outliers, auths them, persists them to the database, and, + where appropriate (eg, an invite), awakes the notifier. Params: origin: where the events came from @@ -1312,6 +1312,9 @@ class FederationEventHandler: return None auth.append(ae) + # we're not bothering about room state, so flag the event as an outlier. + event.internal_metadata.outlier = True + context = EventContext.for_outlier() try: validate_event_for_room_version(room_version_obj, event) @@ -1838,7 +1841,7 @@ class FederationEventHandler: The stream ID after which all events have been persisted. """ if not event_and_contexts: - return self._store.get_current_events_token() + return self._store.get_room_max_stream_ordering() instance = self._config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9cd21e7f2b..601bab67f9 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py
@@ -13,21 +13,27 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple - -from twisted.internet import defer +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.events import EventBase from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.handlers.receipts import ReceiptEventSource from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage.roommember import RoomsForUser from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID +from synapse.types import ( + JsonDict, + Requester, + RoomStreamToken, + StateMap, + StreamToken, + UserID, +) from synapse.util import unwrapFirstError -from synapse.util.async_helpers import concurrently_execute +from synapse.util.async_helpers import concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache from synapse.visibility import filter_events_for_client @@ -167,8 +173,6 @@ class InitialSyncHandler: d["invite"] = await self._event_serializer.serialize_event( invite_event, time_now, - # Don't bundle aggregations as this is a deprecated API. - bundle_aggregations=False, as_client_event=as_client_event, ) @@ -190,14 +194,13 @@ class InitialSyncHandler: ) deferred_room_state = run_in_background( self.state_store.get_state_for_events, [event.event_id] - ) - deferred_room_state.addCallback( - lambda states: states[event.event_id] + ).addCallback( + lambda states: cast(StateMap[EventBase], states[event.event_id]) ) (messages, token), current_state = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background( self.store.get_recent_events_for_room, event.room_id, @@ -205,7 +208,7 @@ class InitialSyncHandler: end_token=room_end_token, ), deferred_room_state, - ] + ) ) ).addErrback(unwrapFirstError) @@ -222,8 +225,6 @@ class InitialSyncHandler: await self._event_serializer.serialize_events( messages, time_now=time_now, - # Don't bundle aggregations as this is a deprecated API. - bundle_aggregations=False, as_client_event=as_client_event, ) ), @@ -234,8 +235,6 @@ class InitialSyncHandler: d["state"] = await self._event_serializer.serialize_events( current_state.values(), time_now=time_now, - # Don't bundle aggregations as this is a deprecated API. - bundle_aggregations=False, as_client_event=as_client_event, ) @@ -377,9 +376,7 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - messages, time_now, bundle_aggregations=False - ) + await self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), @@ -387,7 +384,7 @@ class InitialSyncHandler: "state": ( # Don't bundle aggregations as this is a deprecated API. await self._event_serializer.serialize_events( - room_state.values(), time_now, bundle_aggregations=False + room_state.values(), time_now ) ), "presence": [], @@ -408,7 +405,7 @@ class InitialSyncHandler: time_now = self.clock.time_msec() # Don't bundle aggregations as this is a deprecated API. state = await self._event_serializer.serialize_events( - current_state.values(), time_now, bundle_aggregations=False + current_state.values(), time_now ) now_token = self.hs.get_event_sources().get_current_token() @@ -454,8 +451,8 @@ class InitialSyncHandler: return receipts presence, receipts, (messages, token) = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background(get_presence), run_in_background(get_receipts), run_in_background( @@ -464,7 +461,7 @@ class InitialSyncHandler: limit=limit, end_token=now_token.room_key, ), - ], + ), consumeErrors=True, ).addErrback(unwrapFirstError) ) @@ -483,9 +480,7 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - messages, time_now, bundle_aggregations=False - ) + await self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b41dd38ef..d3e8303b83 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json -from twisted.internet import defer from twisted.internet.interfaces import IDelayedCall from synapse import event_auth @@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder, log_failure -from synapse.util.async_helpers import Linearizer, unwrapFirstError +from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -496,6 +495,7 @@ class EventCreationHandler: require_consent: bool = True, outlier: bool = False, historical: bool = False, + allow_no_prev_events: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """ @@ -607,6 +607,7 @@ class EventCreationHandler: prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, depth=depth, + allow_no_prev_events=allow_no_prev_events, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -882,6 +883,7 @@ class EventCreationHandler: prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, depth: Optional[int] = None, + allow_no_prev_events: bool = False, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client @@ -912,6 +914,7 @@ class EventCreationHandler: full_state_ids_at_event = None if auth_event_ids is not None: # If auth events are provided, prev events must be also. + # prev_event_ids could be an empty array though. assert prev_event_ids is not None # Copy the full auth state before it stripped down @@ -943,14 +946,22 @@ class EventCreationHandler: else: prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id) - # we now ought to have some prev_events (unless it's a create event). - # - # do a quick sanity check here, rather than waiting until we've created the + # Do a quick sanity check here, rather than waiting until we've created the # event and then try to auth it (which fails with a somewhat confusing "No # create event in auth events") - assert ( - builder.type == EventTypes.Create or len(prev_event_ids) > 0 - ), "Attempting to create an event with no prev_events" + if allow_no_prev_events: + # We allow events with no `prev_events` but it better have some `auth_events` + assert ( + builder.type == EventTypes.Create + # Allow an event to have empty list of prev_event_ids + # only if it has auth_event_ids. + or auth_event_ids + ), "Attempting to create a non-m.room.create event with no prev_events or auth_event_ids" + else: + # we now ought to have some prev_events (unless it's a create event). + assert ( + builder.type == EventTypes.Create or prev_event_ids + ), "Attempting to create a non-m.room.create event with no prev_events" event = await builder.build( prev_event_ids=prev_event_ids, @@ -1156,9 +1167,9 @@ class EventCreationHandler: # We now persist the event (and update the cache in parallel, since we # don't want to block on it). - result = await make_deferred_yieldable( - defer.gatherResults( - [ + result, _ = await make_deferred_yieldable( + gather_results( + ( run_in_background( self._persist_event, requester=requester, @@ -1170,12 +1181,12 @@ class EventCreationHandler: run_in_background( self.cache_joined_hosts_for_event, event, context ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), - ], + ), consumeErrors=True, ) ).addErrback(unwrapFirstError) - return result[0] + return result async def _persist_event( self, diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 4f42438053..7469cc55a2 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -542,7 +542,10 @@ class PaginationHandler: chunk = { "chunk": ( await self._event_serializer.serialize_events( - events, time_now, as_client_event=as_client_event + events, + time_now, + bundle_aggregations=True, + as_client_event=as_client_event, ) ), "start": await from_token.to_string(self.store), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 454d06c973..c781fefb1b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler): # Presence is best effort and quickly heals itself, so lets just always # stream from the current state when we restart. - self._event_pos = self.store.get_current_events_token() + self._event_pos = self.store.get_room_max_stream_ordering() self._event_processing = False async def _on_shutdown(self) -> None: diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4911a11535..5cb1ff749d 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py
@@ -14,7 +14,7 @@ import logging from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple -from synapse.api.constants import ReadReceiptEventFields +from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes from synapse.appservice import ApplicationService from synapse.streams import EventSource from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id @@ -178,7 +178,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]): for event_id in content.keys(): event_content = content.get(event_id, {}) - m_read = event_content.get("m.read", {}) + m_read = event_content.get(ReceiptTypes.READ, {}) # If m_read is missing copy over the original event_content as there is nothing to process here if not m_read: @@ -206,7 +206,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]): # Set new users unless empty if len(new_users.keys()) > 0: - new_event["content"][event_id] = {"m.read": new_users} + new_event["content"][event_id] = {ReceiptTypes.READ: new_users} # Append new_event to visible_events unless empty if len(new_event["content"].keys()) > 0: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ead2198e14..b9c1cbffa5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -172,7 +172,7 @@ class RoomCreationHandler: user_id = requester.user.to_string() # Check if this room is already being upgraded by another person - for key in self._upgrade_response_cache.pending_result_cache: + for key in self._upgrade_response_cache.keys(): if key[0] == old_room_id and key[1] != user_id: # Two different people are trying to upgrade the same room. # Send the second an error. diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index ba7a14d651..1a33211a1f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -13,9 +13,9 @@ # limitations under the License. import logging -from collections import namedtuple from typing import TYPE_CHECKING, Any, Optional, Tuple +import attr import msgpack from unpaddedbase64 import decode_base64, encode_base64 @@ -474,16 +474,12 @@ class RoomListHandler: ) -class RoomListNextBatch( - namedtuple( - "RoomListNextBatch", - ( - "last_joined_members", # The count to get rooms after/before - "last_room_id", # The room_id to get rooms after/before - "direction_is_forward", # Bool if this is a next_batch, false if prev_batch - ), - ) -): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class RoomListNextBatch: + last_joined_members: int # The count to get rooms after/before + last_room_id: str # The room_id to get rooms after/before + direction_is_forward: bool # True if this is a next_batch, false if prev_batch + KEY_DICT = { "last_joined_members": "m", "last_room_id": "r", @@ -502,12 +498,12 @@ class RoomListNextBatch( def to_token(self) -> str: return encode_base64( msgpack.dumps( - {self.KEY_DICT[key]: val for key, val in self._asdict().items()} + {self.KEY_DICT[key]: val for key, val in attr.asdict(self).items()} ) ) def copy_and_replace(self, **kwds: Any) -> "RoomListNextBatch": - return self._replace(**kwds) + return attr.evolve(self, **kwds) def _matches_room_entry(room_entry: JsonDict, search_filter: dict) -> bool: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index cac76d0221..27e2903a8f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -678,7 +678,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if block_invite: raise SynapseError(403, "Invites have been disabled on this server") - if prev_event_ids: + # An empty prev_events list is allowed as long as the auth_event_ids are present + if prev_event_ids is not None: return await self._local_membership_update( requester=requester, target=target, @@ -1039,7 +1040,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # Add new room to the room directory if the old room was there # Remove old room from the room directory old_room = await self.store.get_room(old_room_id) - if old_room and old_room["is_public"]: + if old_room is not None and old_room["is_public"]: await self.store.set_room_is_public(old_room_id, False) await self.store.set_room_is_public(room_id, True) @@ -1050,7 +1051,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): local_group_ids = await self.store.get_local_groups_for_room(old_room_id) for group_id in local_group_ids: # Add new the new room to those groups - await self.store.add_room_to_group(group_id, room_id, old_room["is_public"]) + await self.store.add_room_to_group( + group_id, room_id, old_room is not None and old_room["is_public"] + ) # Remove the old room from those groups await self.store.remove_room_from_group(group_id, old_room_id) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index bd3e6f2ec7..29e41a4c79 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py
@@ -80,6 +80,17 @@ class StatsHandler: # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = await self.store.get_stats_positions() + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding stats processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering # Loop round handling deltas until we're up to date diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f3039c3c3f..7baf3f199c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -28,7 +28,7 @@ from typing import ( import attr from prometheus_client import Counter -from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS @@ -36,6 +36,7 @@ from synapse.events import EventBase from synapse.logging.context import current_context from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user +from synapse.storage.databases.main.event_push_actions import NotifCounts from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( @@ -421,7 +422,7 @@ class SyncHandler: span to track the sync. See `generate_sync_result` for the next part of your indoctrination. """ - with start_active_span("current_sync_for_user"): + with start_active_span("sync.current_sync_for_user"): log_kv({"since_token": since_token}) sync_result = await self.generate_sync_result( sync_config, since_token, full_state @@ -1041,18 +1042,17 @@ class SyncHandler: async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig - ) -> Dict[str, int]: + ) -> NotifCounts: with Measure(self.clock, "unread_notifs_for_room_id"): last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( user_id=sync_config.user.to_string(), room_id=room_id, - receipt_type="m.read", + receipt_type=ReceiptTypes.READ, ) - notifs = await self.store.get_unread_event_push_actions_by_room_for_user( + return await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) - return notifs async def generate_sync_result( self, @@ -1585,7 +1585,8 @@ class SyncHandler: ) logger.debug("Generated room entry for %s", room_entry.room_id) - await concurrently_execute(handle_room_entries, room_entries, 10) + with start_active_span("sync.generate_room_entries"): + await concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builder.invited.extend(invited) sync_result_builder.knocked.extend(knocked) @@ -1662,20 +1663,20 @@ class SyncHandler: ) -> _RoomChanges: """Determine the changes in rooms to report to the user. - Ideally, we want to report all events whose stream ordering `s` lies in the - range `since_token < s <= now_token`, where the two tokens are read from the - sync_result_builder. + This function is a first pass at generating the rooms part of the sync response. + It determines which rooms have changed during the sync period, and categorises + them into four buckets: "knock", "invite", "join" and "leave". - If there are too many events in that range to report, things get complicated. - In this situation we return a truncated list of the most recent events, and - indicate in the response that there is a "gap" of omitted events. Additionally: + 1. Finds all membership changes for the user in the sync period (from + `since_token` up to `now_token`). + 2. Uses those to place the room in one of the four categories above. + 3. Builds a `_RoomChanges` struct to record this, and return that struct. - - we include a "state_delta", to describe the changes in state over the gap, - - we include all membership events applying to the user making the request, - even those in the gap. - - See the spec for the rationale: - https://spec.matrix.org/v1.1/client-server-api/#syncing + For rooms classified as "knock", "invite" or "leave", we just need to report + a single membership event in the eventual /sync response. For "join" we need + to fetch additional non-membership events, e.g. messages in the room. That is + more complicated, so instead we report an intermediary `RoomSyncResultBuilder` + struct, and leave the additional work to `_generate_room_entry`. The sync_result_builder is not modified by this function. """ @@ -1686,16 +1687,6 @@ class SyncHandler: assert since_token - # The spec - # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync - # notes that membership events need special consideration: - # - # > When a sync is limited, the server MUST return membership events for events - # > in the gap (between since and the start of the returned timeline), regardless - # > as to whether or not they are redundant. - # - # We fetch such events here, but we only seem to use them for categorising rooms - # as newly joined, newly left, invited or knocked. # TODO: we've already called this function and ran this query in # _have_rooms_changed. We could keep the results in memory to avoid a # second query, at the cost of more complicated source code. @@ -2009,6 +2000,23 @@ class SyncHandler: """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. + Ideally, we want to report all events whose stream ordering `s` lies in the + range `since_token < s <= now_token`, where the two tokens are read from the + sync_result_builder. + + If there are too many events in that range to report, things get complicated. + In this situation we return a truncated list of the most recent events, and + indicate in the response that there is a "gap" of omitted events. Lots of this + is handled in `_load_filtered_recents`, but some of is handled in this method. + + Additionally: + - we include a "state_delta", to describe the changes in state over the gap, + - we include all membership events applying to the user making the request, + even those in the gap. + + See the spec for the rationale: + https://spec.matrix.org/v1.1/client-server-api/#syncing + Args: sync_result_builder ignored_users: Set of users ignored by user. @@ -2038,7 +2046,7 @@ class SyncHandler: since_token = room_builder.since_token upto_token = room_builder.upto_token - with start_active_span("generate_room_entry"): + with start_active_span("sync.generate_room_entry"): set_tag("room_id", room_id) log_kv({"events": len(events or ())}) @@ -2166,10 +2174,10 @@ class SyncHandler: if room_sync or always_include: notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] + unread_notifications["notification_count"] = notifs.notify_count + unread_notifications["highlight_count"] = notifs.highlight_count - room_sync.unread_count = notifs["unread_count"] + room_sync.unread_count = notifs.unread_count sync_result_builder.joined.append(room_sync) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 1676ebd057..e43c22832d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py
@@ -13,9 +13,10 @@ # limitations under the License. import logging import random -from collections import namedtuple from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple +import attr + from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import ( @@ -37,7 +38,10 @@ logger = logging.getLogger(__name__) # A tiny object useful for storing a user's membership in a room, as a mapping # key -RoomMember = namedtuple("RoomMember", ("room_id", "user_id")) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class RoomMember: + room_id: str + user_id: str # How often we expect remote servers to resend us presence. @@ -119,7 +123,7 @@ class FollowerTypingHandler: self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000) def is_typing(self, member: RoomMember) -> bool: - return member.user_id in self._room_typing.get(member.room_id, []) + return member.user_id in self._room_typing.get(member.room_id, set()) async def _push_remote(self, member: RoomMember, typing: bool) -> None: if not self.federation: @@ -166,9 +170,9 @@ class FollowerTypingHandler: for row in rows: self._room_serials[row.room_id] = token - prev_typing = set(self._room_typing.get(row.room_id, [])) + prev_typing = self._room_typing.get(row.room_id, set()) now_typing = set(row.user_ids) - self._room_typing[row.room_id] = row.user_ids + self._room_typing[row.room_id] = now_typing if self.federation: run_as_background_process( diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a0eb45446f..1565e034cb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler): if self.pos is None: self.pos = await self.store.get_user_directory_stream_pos() - # If still None then the initial background update hasn't happened yet. - if self.pos is None: - return None + # If still None then the initial background update hasn't happened yet. + if self.pos is None: + return None + + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding user directory processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering # Loop round handling deltas until we're up to date while True: