diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9abdad262b..7833e77e2b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -462,9 +462,9 @@ class ApplicationServicesHandler:
Args:
room_alias: The room alias to query.
+
Returns:
- namedtuple: with keys "room_id" and "servers" or None if no
- association can be found.
+ RoomAliasMapping or None if no association can be found.
"""
room_alias_str = room_alias.to_string()
services = self.store.get_app_services()
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61607cf2ba..84724b207c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -997,9 +997,7 @@ class AuthHandler:
# really don't want is active access_tokens without a record of the
# device, so we double-check it here.
if device_id is not None:
- try:
- await self.store.get_device(user_id, device_id)
- except StoreError:
+ if await self.store.get_device(user_id, device_id) is None:
await self.store.delete_access_token(access_token)
raise StoreError(400, "Login raced against device deletion")
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 82ee11e921..7665425232 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -106,10 +106,10 @@ class DeviceWorkerHandler:
Raises:
errors.NotFoundError: if the device was not found
"""
- try:
- device = await self.store.get_device(user_id, device_id)
- except errors.StoreError:
- raise errors.NotFoundError
+ device = await self.store.get_device(user_id, device_id)
+ if device is None:
+ raise errors.NotFoundError()
+
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
@@ -602,6 +602,8 @@ class DeviceHandler(DeviceWorkerHandler):
access_token, device_id
)
old_device = await self.store.get_device(user_id, old_device_id)
+ if old_device is None:
+ raise errors.NotFoundError()
await self.store.update_device(user_id, device_id, old_device["display_name"])
# can't call self.delete_device because that will clobber the
# access token so call the storage layer directly
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7ee5c47fd9..082f521791 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -278,13 +278,15 @@ class DirectoryHandler:
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
- servers = set(extra_servers) | set(servers)
+ servers_set = set(extra_servers) | set(servers)
# If this server is in the list of servers, return it first.
- if self.server_name in servers:
- servers = [self.server_name] + [s for s in servers if s != self.server_name]
+ if self.server_name in servers_set:
+ servers = [self.server_name] + [
+ s for s in servers_set if s != self.server_name
+ ]
else:
- servers = list(servers)
+ servers = list(servers_set)
return {"room_id": room_id, "servers": servers}
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 60c11e3d21..14360b4e40 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,8 +65,12 @@ class E2eKeysHandler:
else:
# Only register this edu handler on master as it requires writing
# device updates to the db
- #
- # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+ federation_registry.register_edu_handler(
+ "m.signing_key_update",
+ self._edu_updater.incoming_signing_key_update,
+ )
+ # also handle the unstable version
+ # FIXME: remove this when enough servers have upgraded
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
@@ -576,7 +580,9 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
- fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
+ fallback_keys = keys.get("fallback_keys") or keys.get(
+ "org.matrix.msc2732.fallback_keys"
+ )
if fallback_keys and isinstance(fallback_keys, dict):
log_kv(
{
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 31742236a9..12614b2c5d 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -14,7 +14,9 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, Dict, Optional
+
+from typing_extensions import Literal
from synapse.api.errors import (
Codes,
@@ -24,6 +26,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
+from synapse.storage.databases.main.e2e_room_keys import RoomKey
from synapse.types import JsonDict
from synapse.util.async_helpers import Linearizer
@@ -58,7 +61,9 @@ class E2eRoomKeysHandler:
version: str,
room_id: Optional[str] = None,
session_id: Optional[str] = None,
- ) -> List[JsonDict]:
+ ) -> Dict[
+ Literal["rooms"], Dict[str, Dict[Literal["sessions"], Dict[str, RoomKey]]]
+ ]:
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
room, or a given session.
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
@@ -72,8 +77,8 @@ class E2eRoomKeysHandler:
Raises:
NotFoundError: if the backup version does not exist
Returns:
- A list of dicts giving the session_data and message metadata for
- these room keys.
+ A dict giving the session_data and message metadata for these room keys.
+ `{"rooms": {room_id: {"sessions": {session_id: room_key}}}}`
"""
# we deliberately take the lock to get keys so that changing the version
@@ -273,7 +278,7 @@ class E2eRoomKeysHandler:
@staticmethod
def _should_replace_room_key(
- current_room_key: Optional[JsonDict], room_key: JsonDict
+ current_room_key: Optional[RoomKey], room_key: RoomKey
) -> bool:
"""
Determine whether to replace a given current_room_key (if any)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 32b0254c5f..1b996c420d 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -79,13 +79,14 @@ class EventStreamHandler:
# thundering herds on restart.
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
- events, tokens = await self.notifier.get_events_for(
+ stream_result = await self.notifier.get_events_for(
auth_user,
pagin_config,
timeout,
is_guest=is_guest,
explicit_room_id=room_id,
)
+ events = stream_result.events
time_now = self.clock.time_msec()
@@ -122,14 +123,12 @@ class EventStreamHandler:
events,
time_now,
as_client_event=as_client_event,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
)
chunk = {
"chunk": chunks,
- "start": await tokens[0].to_string(self.store),
- "end": await tokens[1].to_string(self.store),
+ "start": await stream_result.start_token.to_string(self.store),
+ "end": await stream_result.end_token.to_string(self.store),
}
return chunk
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1ea837d082..26b8e3f43c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -360,31 +360,34 @@ class FederationHandler:
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
- states = await make_deferred_yieldable(
+ states_list = await make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
)
- # dict[str, dict[tuple, str]], a map from event_id to state map of
- # event_ids.
- states = dict(zip(event_ids, [s.state for s in states]))
+ # A map from event_id to state map of event_ids.
+ state_ids: Dict[str, StateMap[str]] = dict(
+ zip(event_ids, [s.state for s in states_list])
+ )
state_map = await self.store.get_events(
- [e_id for ids in states.values() for e_id in ids.values()],
+ [e_id for ids in state_ids.values() for e_id in ids.values()],
get_prev_content=False,
)
- states = {
+
+ # A map from event_id to state map of events.
+ state_events: Dict[str, StateMap[EventBase]] = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.items()
if e_id in state_map
}
- for key, state_dict in states.items()
+ for key, state_dict in state_ids.items()
}
for e_id in event_ids:
- likely_extremeties_domains = get_domains_from_state(states[e_id])
+ likely_extremeties_domains = get_domains_from_state(state_events[e_id])
success = await try_backfill(
[
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..11771f3c9c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -421,9 +421,6 @@ class FederationEventHandler:
Raises:
SynapseError if the response is in some way invalid.
"""
- for e in itertools.chain(auth_events, state):
- e.internal_metadata.outlier = True
-
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}
create_event = None
@@ -666,7 +663,9 @@ class FederationEventHandler:
logger.info("Processing pulled event %s", event)
# these should not be outliers.
- assert not event.internal_metadata.is_outlier()
+ assert (
+ not event.internal_metadata.is_outlier()
+ ), "pulled event unexpectedly flagged as outlier"
event_id = event.event_id
@@ -1192,7 +1191,6 @@ class FederationEventHandler:
[destination],
event_id,
room_version,
- outlier=True,
)
if event is None:
logger.warning(
@@ -1221,9 +1219,10 @@ class FederationEventHandler:
"""Persist a batch of outlier events fetched from remote servers.
We first sort the events to make sure that we process each event's auth_events
- before the event itself, and then auth and persist them.
+ before the event itself.
- Notifies about the events where appropriate.
+ We then mark the events as outliers, persist them to the database, and, where
+ appropriate (eg, an invite), awake the notifier.
Params:
room_id: the room that the events are meant to be in (though this has
@@ -1274,7 +1273,8 @@ class FederationEventHandler:
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.
- Notifies about the events where appropriate.
+ Marks the events as outliers, auths them, persists them to the database, and,
+ where appropriate (eg, an invite), awakes the notifier.
Params:
origin: where the events came from
@@ -1312,6 +1312,9 @@ class FederationEventHandler:
return None
auth.append(ae)
+ # we're not bothering about room state, so flag the event as an outlier.
+ event.internal_metadata.outlier = True
+
context = EventContext.for_outlier()
try:
validate_event_for_room_version(room_version_obj, event)
@@ -1838,7 +1841,7 @@ class FederationEventHandler:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
- return self._store.get_current_events_token()
+ return self._store.get_room_max_stream_ordering()
instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9cd21e7f2b..601bab67f9 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,21 +13,27 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple
-
-from twisted.internet import defer
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+ JsonDict,
+ Requester,
+ RoomStreamToken,
+ StateMap,
+ StreamToken,
+ UserID,
+)
from synapse.util import unwrapFirstError
-from synapse.util.async_helpers import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client
@@ -167,8 +173,6 @@ class InitialSyncHandler:
d["invite"] = await self._event_serializer.serialize_event(
invite_event,
time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
@@ -190,14 +194,13 @@ class InitialSyncHandler:
)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
- )
- deferred_room_state.addCallback(
- lambda states: states[event.event_id]
+ ).addCallback(
+ lambda states: cast(StateMap[EventBase], states[event.event_id])
)
(messages, token), current_state = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ gather_results(
+ (
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
@@ -205,7 +208,7 @@ class InitialSyncHandler:
end_token=room_end_token,
),
deferred_room_state,
- ]
+ )
)
).addErrback(unwrapFirstError)
@@ -222,8 +225,6 @@ class InitialSyncHandler:
await self._event_serializer.serialize_events(
messages,
time_now=time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
),
@@ -234,8 +235,6 @@ class InitialSyncHandler:
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
- # Don't bundle aggregations as this is a deprecated API.
- bundle_aggregations=False,
as_client_event=as_client_event,
)
@@ -377,9 +376,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- await self._event_serializer.serialize_events(
- messages, time_now, bundle_aggregations=False
- )
+ await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
@@ -387,7 +384,7 @@ class InitialSyncHandler:
"state": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
- room_state.values(), time_now, bundle_aggregations=False
+ room_state.values(), time_now
)
),
"presence": [],
@@ -408,7 +405,7 @@ class InitialSyncHandler:
time_now = self.clock.time_msec()
# Don't bundle aggregations as this is a deprecated API.
state = await self._event_serializer.serialize_events(
- current_state.values(), time_now, bundle_aggregations=False
+ current_state.values(), time_now
)
now_token = self.hs.get_event_sources().get_current_token()
@@ -454,8 +451,8 @@ class InitialSyncHandler:
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ gather_results(
+ (
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
@@ -464,7 +461,7 @@ class InitialSyncHandler:
limit=limit,
end_token=now_token.room_key,
),
- ],
+ ),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
@@ -483,9 +480,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
- await self._event_serializer.serialize_events(
- messages, time_now, bundle_aggregations=False
- )
+ await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b41dd38ef..d3e8303b83 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
-from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
@@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder, log_failure
-from synapse.util.async_helpers import Linearizer, unwrapFirstError
+from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -496,6 +495,7 @@ class EventCreationHandler:
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
+ allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
@@ -607,6 +607,7 @@ class EventCreationHandler:
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
+ allow_no_prev_events=allow_no_prev_events,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -882,6 +883,7 @@ class EventCreationHandler:
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
+ allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -912,6 +914,7 @@ class EventCreationHandler:
full_state_ids_at_event = None
if auth_event_ids is not None:
# If auth events are provided, prev events must be also.
+ # prev_event_ids could be an empty array though.
assert prev_event_ids is not None
# Copy the full auth state before it stripped down
@@ -943,14 +946,22 @@ class EventCreationHandler:
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
- # we now ought to have some prev_events (unless it's a create event).
- #
- # do a quick sanity check here, rather than waiting until we've created the
+ # Do a quick sanity check here, rather than waiting until we've created the
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
- assert (
- builder.type == EventTypes.Create or len(prev_event_ids) > 0
- ), "Attempting to create an event with no prev_events"
+ if allow_no_prev_events:
+ # We allow events with no `prev_events` but it better have some `auth_events`
+ assert (
+ builder.type == EventTypes.Create
+ # Allow an event to have empty list of prev_event_ids
+ # only if it has auth_event_ids.
+ or auth_event_ids
+ ), "Attempting to create a non-m.room.create event with no prev_events or auth_event_ids"
+ else:
+ # we now ought to have some prev_events (unless it's a create event).
+ assert (
+ builder.type == EventTypes.Create or prev_event_ids
+ ), "Attempting to create a non-m.room.create event with no prev_events"
event = await builder.build(
prev_event_ids=prev_event_ids,
@@ -1156,9 +1167,9 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
- result = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ result, _ = await make_deferred_yieldable(
+ gather_results(
+ (
run_in_background(
self._persist_event,
requester=requester,
@@ -1170,12 +1181,12 @@ class EventCreationHandler:
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
- ],
+ ),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
- return result[0]
+ return result
async def _persist_event(
self,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 4f42438053..7469cc55a2 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -542,7 +542,10 @@ class PaginationHandler:
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
- events, time_now, as_client_event=as_client_event
+ events,
+ time_now,
+ bundle_aggregations=True,
+ as_client_event=as_client_event,
)
),
"start": await from_token.to_string(self.store),
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 454d06c973..c781fefb1b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler):
# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
- self._event_pos = self.store.get_current_events_token()
+ self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False
async def _on_shutdown(self) -> None:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4911a11535..5cb1ff749d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -14,7 +14,7 @@
import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
-from synapse.api.constants import ReadReceiptEventFields
+from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
@@ -178,7 +178,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
for event_id in content.keys():
event_content = content.get(event_id, {})
- m_read = event_content.get("m.read", {})
+ m_read = event_content.get(ReceiptTypes.READ, {})
# If m_read is missing copy over the original event_content as there is nothing to process here
if not m_read:
@@ -206,7 +206,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
# Set new users unless empty
if len(new_users.keys()) > 0:
- new_event["content"][event_id] = {"m.read": new_users}
+ new_event["content"][event_id] = {ReceiptTypes.READ: new_users}
# Append new_event to visible_events unless empty
if len(new_event["content"].keys()) > 0:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ead2198e14..b9c1cbffa5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -172,7 +172,7 @@ class RoomCreationHandler:
user_id = requester.user.to_string()
# Check if this room is already being upgraded by another person
- for key in self._upgrade_response_cache.pending_result_cache:
+ for key in self._upgrade_response_cache.keys():
if key[0] == old_room_id and key[1] != user_id:
# Two different people are trying to upgrade the same room.
# Send the second an error.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index ba7a14d651..1a33211a1f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -13,9 +13,9 @@
# limitations under the License.
import logging
-from collections import namedtuple
from typing import TYPE_CHECKING, Any, Optional, Tuple
+import attr
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@@ -474,16 +474,12 @@ class RoomListHandler:
)
-class RoomListNextBatch(
- namedtuple(
- "RoomListNextBatch",
- (
- "last_joined_members", # The count to get rooms after/before
- "last_room_id", # The room_id to get rooms after/before
- "direction_is_forward", # Bool if this is a next_batch, false if prev_batch
- ),
- )
-):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class RoomListNextBatch:
+ last_joined_members: int # The count to get rooms after/before
+ last_room_id: str # The room_id to get rooms after/before
+ direction_is_forward: bool # True if this is a next_batch, false if prev_batch
+
KEY_DICT = {
"last_joined_members": "m",
"last_room_id": "r",
@@ -502,12 +498,12 @@ class RoomListNextBatch(
def to_token(self) -> str:
return encode_base64(
msgpack.dumps(
- {self.KEY_DICT[key]: val for key, val in self._asdict().items()}
+ {self.KEY_DICT[key]: val for key, val in attr.asdict(self).items()}
)
)
def copy_and_replace(self, **kwds: Any) -> "RoomListNextBatch":
- return self._replace(**kwds)
+ return attr.evolve(self, **kwds)
def _matches_room_entry(room_entry: JsonDict, search_filter: dict) -> bool:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index cac76d0221..27e2903a8f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -678,7 +678,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
- if prev_event_ids:
+ # An empty prev_events list is allowed as long as the auth_event_ids are present
+ if prev_event_ids is not None:
return await self._local_membership_update(
requester=requester,
target=target,
@@ -1039,7 +1040,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Add new room to the room directory if the old room was there
# Remove old room from the room directory
old_room = await self.store.get_room(old_room_id)
- if old_room and old_room["is_public"]:
+ if old_room is not None and old_room["is_public"]:
await self.store.set_room_is_public(old_room_id, False)
await self.store.set_room_is_public(room_id, True)
@@ -1050,7 +1051,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids:
# Add new the new room to those groups
- await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+ await self.store.add_room_to_group(
+ group_id, room_id, old_room is not None and old_room["is_public"]
+ )
# Remove the old room from those groups
await self.store.remove_room_from_group(group_id, old_room_id)
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index bd3e6f2ec7..29e41a4c79 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -80,6 +80,17 @@ class StatsHandler:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self.store.get_stats_positions()
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos > room_max_stream_ordering:
+ # apparently, we've processed more events than exist in the database!
+ # this can happen if events are removed with history purge or similar.
+ logger.warning(
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
+ "rewinding stats processor",
+ self.pos,
+ room_max_stream_ordering,
+ )
+ self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f3039c3c3f..7baf3f199c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,7 +28,7 @@ from typing import (
import attr
from prometheus_client import Counter
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -36,6 +36,7 @@ from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.databases.main.event_push_actions import NotifCounts
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -421,7 +422,7 @@ class SyncHandler:
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
"""
- with start_active_span("current_sync_for_user"):
+ with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
@@ -1041,18 +1042,17 @@ class SyncHandler:
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
- ) -> Dict[str, int]:
+ ) -> NotifCounts:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
- receipt_type="m.read",
+ receipt_type=ReceiptTypes.READ,
)
- notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
+ return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
- return notifs
async def generate_sync_result(
self,
@@ -1585,7 +1585,8 @@ class SyncHandler:
)
logger.debug("Generated room entry for %s", room_entry.room_id)
- await concurrently_execute(handle_room_entries, room_entries, 10)
+ with start_active_span("sync.generate_room_entries"):
+ await concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
@@ -1662,20 +1663,20 @@ class SyncHandler:
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.
- Ideally, we want to report all events whose stream ordering `s` lies in the
- range `since_token < s <= now_token`, where the two tokens are read from the
- sync_result_builder.
+ This function is a first pass at generating the rooms part of the sync response.
+ It determines which rooms have changed during the sync period, and categorises
+ them into four buckets: "knock", "invite", "join" and "leave".
- If there are too many events in that range to report, things get complicated.
- In this situation we return a truncated list of the most recent events, and
- indicate in the response that there is a "gap" of omitted events. Additionally:
+ 1. Finds all membership changes for the user in the sync period (from
+ `since_token` up to `now_token`).
+ 2. Uses those to place the room in one of the four categories above.
+ 3. Builds a `_RoomChanges` struct to record this, and return that struct.
- - we include a "state_delta", to describe the changes in state over the gap,
- - we include all membership events applying to the user making the request,
- even those in the gap.
-
- See the spec for the rationale:
- https://spec.matrix.org/v1.1/client-server-api/#syncing
+ For rooms classified as "knock", "invite" or "leave", we just need to report
+ a single membership event in the eventual /sync response. For "join" we need
+ to fetch additional non-membership events, e.g. messages in the room. That is
+ more complicated, so instead we report an intermediary `RoomSyncResultBuilder`
+ struct, and leave the additional work to `_generate_room_entry`.
The sync_result_builder is not modified by this function.
"""
@@ -1686,16 +1687,6 @@ class SyncHandler:
assert since_token
- # The spec
- # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
- # notes that membership events need special consideration:
- #
- # > When a sync is limited, the server MUST return membership events for events
- # > in the gap (between since and the start of the returned timeline), regardless
- # > as to whether or not they are redundant.
- #
- # We fetch such events here, but we only seem to use them for categorising rooms
- # as newly joined, newly left, invited or knocked.
# TODO: we've already called this function and ran this query in
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
@@ -2009,6 +2000,23 @@ class SyncHandler:
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
+ Ideally, we want to report all events whose stream ordering `s` lies in the
+ range `since_token < s <= now_token`, where the two tokens are read from the
+ sync_result_builder.
+
+ If there are too many events in that range to report, things get complicated.
+ In this situation we return a truncated list of the most recent events, and
+ indicate in the response that there is a "gap" of omitted events. Lots of this
+ is handled in `_load_filtered_recents`, but some of is handled in this method.
+
+ Additionally:
+ - we include a "state_delta", to describe the changes in state over the gap,
+ - we include all membership events applying to the user making the request,
+ even those in the gap.
+
+ See the spec for the rationale:
+ https://spec.matrix.org/v1.1/client-server-api/#syncing
+
Args:
sync_result_builder
ignored_users: Set of users ignored by user.
@@ -2038,7 +2046,7 @@ class SyncHandler:
since_token = room_builder.since_token
upto_token = room_builder.upto_token
- with start_active_span("generate_room_entry"):
+ with start_active_span("sync.generate_room_entry"):
set_tag("room_id", room_id)
log_kv({"events": len(events or ())})
@@ -2166,10 +2174,10 @@ class SyncHandler:
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
- unread_notifications["notification_count"] = notifs["notify_count"]
- unread_notifications["highlight_count"] = notifs["highlight_count"]
+ unread_notifications["notification_count"] = notifs.notify_count
+ unread_notifications["highlight_count"] = notifs.highlight_count
- room_sync.unread_count = notifs["unread_count"]
+ room_sync.unread_count = notifs.unread_count
sync_result_builder.joined.append(room_sync)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 1676ebd057..e43c22832d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -13,9 +13,10 @@
# limitations under the License.
import logging
import random
-from collections import namedtuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
+import attr
+
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import (
@@ -37,7 +38,10 @@ logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
-RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class RoomMember:
+ room_id: str
+ user_id: str
# How often we expect remote servers to resend us presence.
@@ -119,7 +123,7 @@ class FollowerTypingHandler:
self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
def is_typing(self, member: RoomMember) -> bool:
- return member.user_id in self._room_typing.get(member.room_id, [])
+ return member.user_id in self._room_typing.get(member.room_id, set())
async def _push_remote(self, member: RoomMember, typing: bool) -> None:
if not self.federation:
@@ -166,9 +170,9 @@ class FollowerTypingHandler:
for row in rows:
self._room_serials[row.room_id] = token
- prev_typing = set(self._room_typing.get(row.room_id, []))
+ prev_typing = self._room_typing.get(row.room_id, set())
now_typing = set(row.user_ids)
- self._room_typing[row.room_id] = row.user_ids
+ self._room_typing[row.room_id] = now_typing
if self.federation:
run_as_background_process(
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a0eb45446f..1565e034cb 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler):
if self.pos is None:
self.pos = await self.store.get_user_directory_stream_pos()
- # If still None then the initial background update hasn't happened yet.
- if self.pos is None:
- return None
+ # If still None then the initial background update hasn't happened yet.
+ if self.pos is None:
+ return None
+
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos > room_max_stream_ordering:
+ # apparently, we've processed more events than exist in the database!
+ # this can happen if events are removed with history purge or similar.
+ logger.warning(
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
+ "rewinding user directory processor",
+ self.pos,
+ room_max_stream_ordering,
+ )
+ self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date
while True:
|