diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 4af9fbc5d1..0478448b47 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -23,7 +23,7 @@ from synapse.replication.http.account_data import (
ReplicationUserAccountDataRestServlet,
)
from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -105,7 +105,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
await self._notify_modules(user_id, room_id, account_data_type, content)
@@ -141,7 +141,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
await self._notify_modules(user_id, None, account_data_type, content)
@@ -176,7 +176,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
@@ -201,7 +201,7 @@ class AccountDataHandler:
)
self._notifier.on_new_event(
- "account_data_key", max_stream_id, users=[user_id]
+ StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 85bd5e4768..1da7bcc85b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -38,6 +38,7 @@ from synapse.types import (
JsonDict,
RoomAlias,
RoomStreamToken,
+ StreamKeyType,
UserID,
)
from synapse.util.async_helpers import Linearizer
@@ -213,8 +214,8 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key", "presence_key",
- "to_device_key" or "device_list_key". Any other value for `stream_key`
+ `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key`
will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
@@ -235,11 +236,11 @@ class ApplicationServicesHandler:
# Only the following streams are currently supported.
# FIXME: We should use constants for these values.
if stream_key not in (
- "typing_key",
- "receipt_key",
- "presence_key",
- "to_device_key",
- "device_list_key",
+ StreamKeyType.TYPING,
+ StreamKeyType.RECEIPT,
+ StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE,
+ StreamKeyType.DEVICE_LIST,
):
return
@@ -258,14 +259,14 @@ class ApplicationServicesHandler:
# Ignore to-device messages if the feature flag is not enabled
if (
- stream_key == "to_device_key"
+ stream_key == StreamKeyType.TO_DEVICE
and not self._msc2409_to_device_messages_enabled
):
return
# Ignore device lists if the feature flag is not enabled
if (
- stream_key == "device_list_key"
+ stream_key == StreamKeyType.DEVICE_LIST
and not self._msc3202_transaction_extensions_enabled
):
return
@@ -283,15 +284,15 @@ class ApplicationServicesHandler:
if (
stream_key
in (
- "typing_key",
- "receipt_key",
- "presence_key",
- "to_device_key",
+ StreamKeyType.TYPING,
+ StreamKeyType.RECEIPT,
+ StreamKeyType.PRESENCE,
+ StreamKeyType.TO_DEVICE,
)
and service.supports_ephemeral
)
or (
- stream_key == "device_list_key"
+ stream_key == StreamKeyType.DEVICE_LIST
and service.msc3202_transaction_extensions
)
]
@@ -317,7 +318,7 @@ class ApplicationServicesHandler:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
- if stream_key == "typing_key":
+ if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
@@ -333,7 +334,7 @@ class ApplicationServicesHandler:
async with self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
):
- if stream_key == "receipt_key":
+ if stream_key == StreamKeyType.RECEIPT:
events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -342,7 +343,7 @@ class ApplicationServicesHandler:
service, "read_receipt", new_token
)
- elif stream_key == "presence_key":
+ elif stream_key == StreamKeyType.PRESENCE:
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -351,7 +352,7 @@ class ApplicationServicesHandler:
service, "presence", new_token
)
- elif stream_key == "to_device_key":
+ elif stream_key == StreamKeyType.TO_DEVICE:
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
@@ -366,7 +367,7 @@ class ApplicationServicesHandler:
service, "to_device", new_token
)
- elif stream_key == "device_list_key":
+ elif stream_key == StreamKeyType.DEVICE_LIST:
device_list_summary = await self._get_device_list_summary(
service, new_token
)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a91b1ee4d5..1d6d1f8a92 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.types import (
JsonDict,
+ StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
@@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler):
# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
self.notifier.on_new_event(
- "device_list_key", position, users={user_id}, rooms=room_ids
+ StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
)
# We may need to do some processing asynchronously for local user IDs.
@@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler):
from_user_id, user_ids
)
- self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
+ self.notifier.on_new_event(
+ StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
+ )
async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 4cb725d027..53668cce3b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -26,7 +26,7 @@ from synapse.logging.opentracing import (
set_tag,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
@@ -151,7 +151,7 @@ class DeviceMessageHandler:
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", last_stream_id, users=local_messages.keys()
+ StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)
async def _check_for_unknown_devices(
@@ -285,7 +285,7 @@ class DeviceMessageHandler:
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", last_stream_id, users=local_messages.keys()
+ StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)
if self.federation_sender:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d6714228ef..e6c2cfb8c8 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple
import attr
from canonicaljson import encode_canonical_json
@@ -1105,22 +1105,19 @@ class E2eKeysHandler:
# can request over federation
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
- (
- key,
- key_id,
- verify_key,
- ) = await self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
-
- if key is None:
+ cross_signing_keys = await self._retrieve_cross_signing_keys_for_remote_user(
+ user, key_type
+ )
+ if cross_signing_keys is None:
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
- return key, key_id, verify_key
+ return cross_signing_keys
async def _retrieve_cross_signing_keys_for_remote_user(
self,
user: UserID,
desired_key_type: str,
- ) -> Tuple[Optional[dict], Optional[str], Optional[VerifyKey]]:
+ ) -> Optional[Tuple[Dict[str, Any], str, VerifyKey]]:
"""Queries cross-signing keys for a remote user and saves them to the database
Only the key specified by `key_type` will be returned, while all retrieved keys
@@ -1146,12 +1143,10 @@ class E2eKeysHandler:
type(e),
e,
)
- return None, None, None
+ return None
# Process each of the retrieved cross-signing keys
- desired_key = None
- desired_key_id = None
- desired_verify_key = None
+ desired_key_data = None
retrieved_device_ids = []
for key_type in ["master", "self_signing"]:
key_content = remote_result.get(key_type + "_key")
@@ -1196,9 +1191,7 @@ class E2eKeysHandler:
# If this is the desired key type, save it and its ID/VerifyKey
if key_type == desired_key_type:
- desired_key = key_content
- desired_verify_key = verify_key
- desired_key_id = key_id
+ desired_key_data = key_content, key_id, verify_key
# At the same time, store this key in the db for subsequent queries
await self.store.set_e2e_cross_signing_key(
@@ -1212,7 +1205,7 @@ class E2eKeysHandler:
user.to_string(), retrieved_device_ids
)
- return desired_key, desired_key_id, desired_verify_key
+ return desired_key_data
def _check_cross_signing_key(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 38dc5b1f6e..be5099b507 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -659,7 +659,7 @@ class FederationHandler:
# in the invitee's sync stream. It is stripped out for all other local users.
event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
- context = EventContext.for_outlier()
+ context = EventContext.for_outlier(self.storage)
stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@@ -848,7 +848,7 @@ class FederationHandler:
)
)
- context = EventContext.for_outlier()
+ context = EventContext.for_outlier(self.storage)
await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@@ -877,7 +877,7 @@ class FederationHandler:
await self.federation_client.send_leave(host_list, event)
- context = EventContext.for_outlier()
+ context = EventContext.for_outlier(self.storage)
stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 6cf927e4ff..761caa04b7 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -103,7 +103,7 @@ class FederationEventHandler:
self._event_creation_handler = hs.get_event_creation_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._message_handler = hs.get_message_handler()
- self._action_generator = hs.get_action_generator()
+ self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._state_resolution_handler = hs.get_state_resolution_handler()
# avoid a circular dependency by deferring execution here
self._get_room_member_handler = hs.get_room_member_handler
@@ -1423,7 +1423,7 @@ class FederationEventHandler:
# we're not bothering about room state, so flag the event as an outlier.
event.internal_metadata.outlier = True
- context = EventContext.for_outlier()
+ context = EventContext.for_outlier(self._storage)
try:
validate_event_for_room_version(room_version_obj, event)
check_auth_rules_for_event(room_version_obj, event, auth)
@@ -1874,10 +1874,10 @@ class FederationEventHandler:
)
return EventContext.with_state(
+ storage=self._storage,
state_group=state_group,
state_group_before_event=context.state_group_before_event,
- current_state_ids=current_state_ids,
- prev_state_ids=prev_state_ids,
+ state_delta_due_to_event=state_updates,
prev_group=prev_group,
delta_ids=state_updates,
partial_state=context.partial_state,
@@ -1913,7 +1913,7 @@ class FederationEventHandler:
min_depth,
)
else:
- await self._action_generator.handle_push_actions_for_event(
+ await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7b94770f97..d79248ad90 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -30,6 +30,7 @@ from synapse.types import (
Requester,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
)
@@ -143,7 +144,7 @@ class InitialSyncHandler:
to_key=int(now_token.receipt_key),
)
if self.hs.config.experimental.msc2285_enabled:
- receipt = ReceiptEventSource.filter_out_private(receipt, user_id)
+ receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -220,8 +221,10 @@ class InitialSyncHandler:
self.storage, user_id, messages
)
- start_token = now_token.copy_and_replace("room_key", token)
- end_token = now_token.copy_and_replace("room_key", room_end_token)
+ start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
+ end_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, room_end_token
+ )
time_now = self.clock.time_msec()
d["messages"] = {
@@ -369,8 +372,8 @@ class InitialSyncHandler:
self.storage, user_id, messages, is_peeking=is_peeking
)
- start_token = StreamToken.START.copy_and_replace("room_key", token)
- end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
+ start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
+ end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)
time_now = self.clock.time_msec()
@@ -449,7 +452,9 @@ class InitialSyncHandler:
if not receipts:
return []
if self.hs.config.experimental.msc2285_enabled:
- receipts = ReceiptEventSource.filter_out_private(receipts, user_id)
+ receipts = ReceiptEventSource.filter_out_private_receipts(
+ receipts, user_id
+ )
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
@@ -472,7 +477,7 @@ class InitialSyncHandler:
self.storage, user_id, messages, is_peeking=is_peeking
)
- start_token = now_token.copy_and_replace("room_key", token)
+ start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c28b792e6f..0951b9c71f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -44,7 +44,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.event_auth import validate_event_for_room_version
-from synapse.events import EventBase
+from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
@@ -426,7 +426,7 @@ class EventCreationHandler:
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
- self.action_generator = hs.get_action_generator()
+ self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self.spam_checker = hs.get_spam_checker()
self.third_party_event_rules: "ThirdPartyEventRules" = (
@@ -757,6 +757,10 @@ class EventCreationHandler:
The previous version of the event is returned, if it is found in the
event context. Otherwise, None is returned.
"""
+ if event.internal_metadata.is_outlier():
+ # This can happen due to out of band memberships
+ return None
+
prev_state_ids = await context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
@@ -1001,7 +1005,7 @@ class EventCreationHandler:
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
- context = EventContext.for_outlier()
+ context = EventContext.for_outlier(self.storage)
elif (
event.type == EventTypes.MSC2716_INSERTION
and state_event_ids
@@ -1056,20 +1060,11 @@ class EventCreationHandler:
SynapseError if the event is invalid.
"""
- relation = event.content.get("m.relates_to")
+ relation = relation_from_event(event)
if not relation:
return
- relation_type = relation.get("rel_type")
- if not relation_type:
- return
-
- # Ensure the parent is real.
- relates_to = relation.get("event_id")
- if not relates_to:
- return
-
- parent_event = await self.store.get_event(relates_to, allow_none=True)
+ parent_event = await self.store.get_event(relation.parent_id, allow_none=True)
if parent_event:
# And in the same room.
if parent_event.room_id != event.room_id:
@@ -1078,28 +1073,31 @@ class EventCreationHandler:
else:
# There must be some reason that the client knows the event exists,
# see if there are existing relations. If so, assume everything is fine.
- if not await self.store.event_is_target_of_relation(relates_to):
+ if not await self.store.event_is_target_of_relation(relation.parent_id):
# Otherwise, the client can't know about the parent event!
raise SynapseError(400, "Can't send relation to unknown event")
# If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an
# event multiple times).
- if relation_type == RelationTypes.ANNOTATION:
- aggregation_key = relation["key"]
+ if relation.rel_type == RelationTypes.ANNOTATION:
+ aggregation_key = relation.aggregation_key
+
+ if aggregation_key is None:
+ raise SynapseError(400, "Missing aggregation key")
if len(aggregation_key) > 500:
raise SynapseError(400, "Aggregation key is too long")
already_exists = await self.store.has_user_annotated_event(
- relates_to, event.type, aggregation_key, event.sender
+ relation.parent_id, event.type, aggregation_key, event.sender
)
if already_exists:
raise SynapseError(400, "Can't send same reaction twice")
# Don't attempt to start a thread if the parent event is a relation.
- elif relation_type == RelationTypes.THREAD:
- if await self.store.event_includes_relation(relates_to):
+ elif relation.rel_type == RelationTypes.THREAD:
+ if await self.store.event_includes_relation(relation.parent_id):
raise SynapseError(
400, "Cannot start threads from an event with a relation"
)
@@ -1245,7 +1243,9 @@ class EventCreationHandler:
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
- await self.action_generator.handle_push_actions_for_event(event, context)
+ await self._bulk_push_rule_evaluator.action_for_event_by_user(
+ event, context
+ )
try:
# If we're a worker we need to hit out to the master.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 7ee3340373..6ae88add95 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester
+from synapse.types import JsonDict, Requester, StreamKeyType
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -448,7 +448,7 @@ class PaginationHandler:
)
# We expect `/messages` to use historic pagination tokens by default but
# `/messages` should still works with live tokens when manually provided.
- assert from_token.room_key.topological
+ assert from_token.room_key.topological is not None
if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
@@ -491,7 +491,7 @@ class PaginationHandler:
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
- "room_key", leave_token
+ StreamKeyType.ROOM, leave_token
)
await self.hs.get_federation_handler().maybe_backfill(
@@ -513,7 +513,7 @@ class PaginationHandler:
event_filter=event_filter,
)
- next_token = from_token.copy_and_replace("room_key", next_key)
+ next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
if events:
if event_filter:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 268481ec19..dd84e6c88b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
-from synapse.types import JsonDict, UserID, get_domain_from_id
+from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
@@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key",
+ StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=users_to_states.keys(),
@@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key",
+ StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states],
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 43d615357b..e6a35f1d09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse.api.constants import ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
-from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ ReadReceipt,
+ StreamKeyType,
+ UserID,
+ get_domain_from_id,
+)
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -129,7 +135,9 @@ class ReceiptsHandler:
affected_room_ids = list({r.room_id for r in receipts})
- self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
+ self.notifier.on_new_event(
+ StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
+ )
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
@@ -165,43 +173,69 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
self.config = hs.config
@staticmethod
- def filter_out_private(events: List[JsonDict], user_id: str) -> List[JsonDict]:
- """
- This method takes in what is returned by
- get_linearized_receipts_for_rooms() and goes through read receipts
- filtering out m.read.private receipts if they were not sent by the
- current user.
+ def filter_out_private_receipts(
+ rooms: List[JsonDict], user_id: str
+ ) -> List[JsonDict]:
"""
+ Filters a list of serialized receipts (as returned by /sync and /initialSync)
+ and removes private read receipts of other users.
- visible_events = []
-
- # filter out private receipts the user shouldn't see
- for event in events:
- content = event.get("content", {})
- new_event = event.copy()
- new_event["content"] = {}
-
- for event_id, event_content in content.items():
- receipt_event = {}
- for receipt_type, receipt_content in event_content.items():
- if receipt_type == ReceiptTypes.READ_PRIVATE:
- user_rr = receipt_content.get(user_id, None)
- if user_rr:
- receipt_event[ReceiptTypes.READ_PRIVATE] = {
- user_id: user_rr.copy()
- }
- else:
- receipt_event[receipt_type] = receipt_content.copy()
+ This operates on the return value of get_linearized_receipts_for_rooms(),
+ which is wrapped in a cache. Care must be taken to ensure that the input
+ values are not modified.
- # Only include the receipt event if it is non-empty.
- if receipt_event:
- new_event["content"][event_id] = receipt_event
+ Args:
+ rooms: A list of mappings, each mapping has a `content` field, which
+ is a map of event ID -> receipt type -> user ID -> receipt information.
- # Append new_event to visible_events unless empty
- if len(new_event["content"].keys()) > 0:
- visible_events.append(new_event)
+ Returns:
+ The same as rooms, but filtered.
+ """
- return visible_events
+ result = []
+
+ # Iterate through each room's receipt content.
+ for room in rooms:
+ # The receipt content with other user's private read receipts removed.
+ content = {}
+
+ # Iterate over each event ID / receipts for that event.
+ for event_id, orig_event_content in room.get("content", {}).items():
+ event_content = orig_event_content
+ # If there are private read receipts, additional logic is necessary.
+ if ReceiptTypes.READ_PRIVATE in event_content:
+ # Make a copy without private read receipts to avoid leaking
+ # other user's private read receipts..
+ event_content = {
+ receipt_type: receipt_value
+ for receipt_type, receipt_value in event_content.items()
+ if receipt_type != ReceiptTypes.READ_PRIVATE
+ }
+
+ # Copy the current user's private read receipt from the
+ # original content, if it exists.
+ user_private_read_receipt = orig_event_content[
+ ReceiptTypes.READ_PRIVATE
+ ].get(user_id, None)
+ if user_private_read_receipt:
+ event_content[ReceiptTypes.READ_PRIVATE] = {
+ user_id: user_private_read_receipt
+ }
+
+ # Include the event if there is at least one non-private read
+ # receipt or the current user has a private read receipt.
+ if event_content:
+ content[event_id] = event_content
+
+ # Include the event if there is at least one non-private read receipt
+ # or the current user has a private read receipt.
+ if content:
+ # Build a new event to avoid mutating the cache.
+ new_room = {k: v for k, v in room.items() if k != "content"}
+ new_room["content"] = content
+ result.append(new_room)
+
+ return result
async def get_new_events(
self,
@@ -223,7 +257,9 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
)
if self.config.experimental.msc2285_enabled:
- events = ReceiptEventSource.filter_out_private(events, user.to_string())
+ events = ReceiptEventSource.filter_out_private_receipts(
+ events, user.to_string()
+ )
return events, to_key
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index c2754ec918..ab7e54857d 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import collections.abc
import logging
from typing import (
TYPE_CHECKING,
@@ -28,7 +27,7 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.api.errors import SynapseError
-from synapse.events import EventBase
+from synapse.events import EventBase, relation_from_event
from synapse.storage.databases.main.relations import _RelatedEvent
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.visibility import filter_events_for_client
@@ -373,20 +372,21 @@ class RelationsHandler:
if event.is_state():
continue
- relates_to = event.content.get("m.relates_to")
- relation_type = None
- if isinstance(relates_to, collections.abc.Mapping):
- relation_type = relates_to.get("rel_type")
+ relates_to = relation_from_event(event)
+ if relates_to:
# An event which is a replacement (ie edit) or annotation (ie,
# reaction) may not have any other event related to it.
- if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
+ if relates_to.rel_type in (
+ RelationTypes.ANNOTATION,
+ RelationTypes.REPLACE,
+ ):
continue
+ # Track the event's relation information for later.
+ relations_by_id[event.event_id] = relates_to.rel_type
+
# The event should get bundled aggregations.
events_by_id[event.event_id] = event
- # Track the event's relation information for later.
- if isinstance(relation_type, str):
- relations_by_id[event.event_id] = relation_type
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 604eb6ec15..a2973109ad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -33,6 +33,7 @@ from typing import (
import attr
from typing_extensions import TypedDict
+import synapse.events.snapshot
from synapse.api.constants import (
EventContentFields,
EventTypes,
@@ -72,12 +73,12 @@ from synapse.types import (
RoomID,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
create_requester,
)
from synapse.util import stringutils
-from synapse.util.async_helpers import Linearizer
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_and_validate_server_name
from synapse.visibility import filter_events_for_client
@@ -149,10 +150,11 @@ class RoomCreationHandler:
)
preset_config["encrypted"] = encrypted
- self._replication = hs.get_replication_data_handler()
+ self._default_power_level_content_override = (
+ self.config.room.default_power_level_content_override
+ )
- # linearizer to stop two upgrades happening at once
- self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
+ self._replication = hs.get_replication_data_handler()
# If a user tries to update the same room multiple times in quick
# succession, only process the first attempt and return its result to
@@ -196,6 +198,39 @@ class RoomCreationHandler:
400, "An upgrade for this room is currently in progress"
)
+ # Check whether the room exists and 404 if it doesn't.
+ # We could go straight for the auth check, but that will raise a 403 instead.
+ old_room = await self.store.get_room(old_room_id)
+ if old_room is None:
+ raise NotFoundError("Unknown room id %s" % (old_room_id,))
+
+ new_room_id = self._generate_room_id()
+
+ # Check whether the user has the power level to carry out the upgrade.
+ # `check_auth_rules_from_context` will check that they are in the room and have
+ # the required power level to send the tombstone event.
+ (
+ tombstone_event,
+ tombstone_context,
+ ) = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
+ },
+ },
+ )
+ old_room_version = await self.store.get_room_version(old_room_id)
+ validate_event_for_room_version(old_room_version, tombstone_event)
+ await self._event_auth_handler.check_auth_rules_from_context(
+ old_room_version, tombstone_event, tombstone_context
+ )
+
# Upgrade the room
#
# If this user has sent multiple upgrade requests for the same room
@@ -206,19 +241,35 @@ class RoomCreationHandler:
self._upgrade_room,
requester,
old_room_id,
- new_version, # args for _upgrade_room
+ old_room, # args for _upgrade_room
+ new_room_id,
+ new_version,
+ tombstone_event,
+ tombstone_context,
)
return ret
async def _upgrade_room(
- self, requester: Requester, old_room_id: str, new_version: RoomVersion
+ self,
+ requester: Requester,
+ old_room_id: str,
+ old_room: Dict[str, Any],
+ new_room_id: str,
+ new_version: RoomVersion,
+ tombstone_event: EventBase,
+ tombstone_context: synapse.events.snapshot.EventContext,
) -> str:
"""
Args:
requester: the user requesting the upgrade
old_room_id: the id of the room to be replaced
- new_versions: the version to upgrade the room to
+ old_room: a dict containing room information for the room to be replaced,
+ as returned by `RoomWorkerStore.get_room`.
+ new_room_id: the id of the replacement room
+ new_version: the version to upgrade the room to
+ tombstone_event: the tombstone event to send to the old room
+ tombstone_context: the context for the tombstone event
Raises:
ShadowBanError if the requester is shadow-banned.
@@ -226,40 +277,15 @@ class RoomCreationHandler:
user_id = requester.user.to_string()
assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
- # start by allocating a new room id
- r = await self.store.get_room(old_room_id)
- if r is None:
- raise NotFoundError("Unknown room id %s" % (old_room_id,))
- new_room_id = await self._generate_room_id(
- creator_id=user_id,
- is_public=r["is_public"],
- room_version=new_version,
- )
-
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
- # we create and auth the tombstone event before properly creating the new
- # room, to check our user has perms in the old room.
- (
- tombstone_event,
- tombstone_context,
- ) = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Tombstone,
- "state_key": "",
- "room_id": old_room_id,
- "sender": user_id,
- "content": {
- "body": "This room has been replaced",
- "replacement_room": new_room_id,
- },
- },
- )
- old_room_version = await self.store.get_room_version(old_room_id)
- validate_event_for_room_version(old_room_version, tombstone_event)
- await self._event_auth_handler.check_auth_rules_from_context(
- old_room_version, tombstone_event, tombstone_context
+ # create the new room. may raise a `StoreError` in the exceedingly unlikely
+ # event of a room ID collision.
+ await self.store.store_room(
+ room_id=new_room_id,
+ room_creator_user_id=user_id,
+ is_public=old_room["is_public"],
+ room_version=new_version,
)
await self.clone_existing_room(
@@ -778,7 +804,7 @@ class RoomCreationHandler:
visibility = config.get("visibility", "private")
is_public = visibility == "public"
- room_id = await self._generate_room_id(
+ room_id = await self._generate_and_create_room_id(
creator_id=user_id,
is_public=is_public,
room_version=room_version,
@@ -1042,9 +1068,19 @@ class RoomCreationHandler:
for invitee in invite_list:
power_level_content["users"][invitee] = 100
- # Power levels overrides are defined per chat preset
+ # If the user supplied a preset name e.g. "private_chat",
+ # we apply that preset
power_level_content.update(config["power_level_content_override"])
+ # If the server config contains default_power_level_content_override,
+ # and that contains information for this room preset, apply it.
+ if self._default_power_level_content_override:
+ override = self._default_power_level_content_override.get(preset_config)
+ if override is not None:
+ power_level_content.update(override)
+
+ # Finally, if the user supplied specific permissions for this room,
+ # apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)
@@ -1090,7 +1126,26 @@ class RoomCreationHandler:
return last_sent_stream_id
- async def _generate_room_id(
+ def _generate_room_id(self) -> str:
+ """Generates a random room ID.
+
+ Room IDs look like "!opaque_id:domain" and are case-sensitive as per the spec
+ at https://spec.matrix.org/v1.2/appendices/#room-ids-and-event-ids.
+
+ Does not check for collisions with existing rooms or prevent future calls from
+ returning the same room ID. To ensure the uniqueness of a new room ID, use
+ `_generate_and_create_room_id` instead.
+
+ Synapse's room IDs are 18 [a-zA-Z] characters long, which comes out to around
+ 102 bits.
+
+ Returns:
+ A random room ID of the form "!opaque_id:domain".
+ """
+ random_string = stringutils.random_string(18)
+ return RoomID(random_string, self.hs.hostname).to_string()
+
+ async def _generate_and_create_room_id(
self,
creator_id: str,
is_public: bool,
@@ -1101,8 +1156,7 @@ class RoomCreationHandler:
attempts = 0
while attempts < 5:
try:
- random_string = stringutils.random_string(18)
- gen_room_id = RoomID(random_string, self.hs.hostname).to_string()
+ gen_room_id = self._generate_room_id()
await self.store.store_room(
room_id=gen_room_id,
room_creator_user_id=creator_id,
@@ -1239,10 +1293,10 @@ class RoomContextHandler:
events_after=events_after,
state=await filter_evts(state_events),
aggregations=aggregations,
- start=await token.copy_and_replace("room_key", results.start).to_string(
- self.store
- ),
- end=await token.copy_and_replace("room_key", results.end).to_string(
+ start=await token.copy_and_replace(
+ StreamKeyType.ROOM, results.start
+ ).to_string(self.store),
+ end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
self.store
),
)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 5619f8f50e..cd1c47dae8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamKeyType, UserID
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -655,11 +655,11 @@ class SearchHandler:
"events_before": events_before,
"events_after": events_after,
"start": await now_token.copy_and_replace(
- "room_key", res.start
+ StreamKeyType.ROOM, res.start
+ ).to_string(self.store),
+ "end": await now_token.copy_and_replace(
+ StreamKeyType.ROOM, res.end
).to_string(self.store),
- "end": await now_token.copy_and_replace("room_key", res.end).to_string(
- self.store
- ),
}
if include_profile:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c555a66d0..4be08fe7cb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -37,6 +37,7 @@ from synapse.types import (
Requester,
RoomStreamToken,
StateMap,
+ StreamKeyType,
StreamToken,
UserID,
)
@@ -449,7 +450,7 @@ class SyncHandler:
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
- now_token = now_token.copy_and_replace("typing_key", typing_key)
+ now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key)
ephemeral_by_room: JsonDict = {}
@@ -471,7 +472,7 @@ class SyncHandler:
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key)
for event in receipts:
room_id = event["room_id"]
@@ -537,7 +538,9 @@ class SyncHandler:
prev_batch_token = now_token
if recents:
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+ prev_batch_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, room_key
+ )
return TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=False
@@ -611,7 +614,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+ prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@@ -1398,7 +1401,7 @@ class SyncHandler:
now_token.to_device_key,
)
sync_result_builder.now_token = now_token.copy_and_replace(
- "to_device_key", stream_id
+ StreamKeyType.TO_DEVICE, stream_id
)
sync_result_builder.to_device = messages
else:
@@ -1503,7 +1506,7 @@ class SyncHandler:
)
assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
- "presence_key", presence_key
+ StreamKeyType.PRESENCE, presence_key
)
extra_users_ids = set(newly_joined_or_invited_users)
@@ -1826,7 +1829,7 @@ class SyncHandler:
# stream token as it'll only be used in the context of this
# room. (c.f. the docstring of `to_room_stream_token`).
leave_token = since_token.copy_and_replace(
- "room_key", leave_position.to_room_stream_token()
+ StreamKeyType.ROOM, leave_position.to_room_stream_token()
)
# If this is an out of band message, like a remote invite
@@ -1875,7 +1878,9 @@ class SyncHandler:
if room_entry:
events, start_key = room_entry
- prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+ prev_batch_token = now_token.copy_and_replace(
+ StreamKeyType.ROOM, start_key
+ )
entry = RoomSyncResultBuilder(
room_id=room_id,
@@ -1972,7 +1977,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- "room_key", RoomStreamToken(None, event.stream_ordering)
+ StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6854428b7c..bb00750bfd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler):
)
self.notifier.on_new_event(
- "typing_key", self._latest_room_serial, rooms=[member.room_id]
+ StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
)
async def get_all_typing_updates(
|