diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index bc04a0755b..89723d24fa 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -230,6 +230,9 @@ class EventContentFields:
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"
+ # an unspecced field added to to-device messages to identify them uniquely-ish
+ TO_DEVICE_MSGID: Final = "org.matrix.msgid"
+
class RoomTypes:
"""Understood values of the room_type field of m.room.create events."""
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 46dc731696..bcc8abe20c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -44,40 +44,8 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
+from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.client import (
- account_data,
- events,
- initial_sync,
- login,
- presence,
- profile,
- push_rule,
- read_marker,
- receipts,
- relations,
- room,
- room_batch,
- room_keys,
- sendtodevice,
- sync,
- tags,
- user_directory,
- versions,
- voip,
-)
-from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
-from synapse.rest.client.devices import DevicesRestServlet
-from synapse.rest.client.keys import (
- KeyChangesServlet,
- KeyQueryServlet,
- KeyUploadServlet,
- OneTimeKeyServlet,
-)
-from synapse.rest.client.register import (
- RegisterRestServlet,
- RegistrationTokenValidityRestServlet,
-)
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -200,45 +168,7 @@ class GenericWorkerServer(HomeServer):
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
- resource = JsonResource(self, canonical_json=False)
-
- RegisterRestServlet(self).register(resource)
- RegistrationTokenValidityRestServlet(self).register(resource)
- login.register_servlets(self, resource)
- ThreepidRestServlet(self).register(resource)
- WhoamiRestServlet(self).register(resource)
- DevicesRestServlet(self).register(resource)
-
- # Read-only
- KeyUploadServlet(self).register(resource)
- KeyQueryServlet(self).register(resource)
- KeyChangesServlet(self).register(resource)
- OneTimeKeyServlet(self).register(resource)
-
- voip.register_servlets(self, resource)
- push_rule.register_servlets(self, resource)
- versions.register_servlets(self, resource)
-
- profile.register_servlets(self, resource)
-
- sync.register_servlets(self, resource)
- events.register_servlets(self, resource)
- room.register_servlets(self, resource, is_worker=True)
- relations.register_servlets(self, resource)
- room.register_deprecated_servlets(self, resource)
- initial_sync.register_servlets(self, resource)
- room_batch.register_servlets(self, resource)
- room_keys.register_servlets(self, resource)
- tags.register_servlets(self, resource)
- account_data.register_servlets(self, resource)
- receipts.register_servlets(self, resource)
- read_marker.register_servlets(self, resource)
-
- sendtodevice.register_servlets(self, resource)
-
- user_directory.register_servlets(self, resource)
-
- presence.register_servlets(self, resource)
+ resource: Resource = ClientRestResource(self)
resources[CLIENT_API_PREFIX] = resource
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 979b128eae..3b5378e6ea 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -26,6 +26,7 @@ class PushConfig(Config):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
push_config = config.get("push") or {}
self.push_include_content = push_config.get("include_content", True)
+ self.enable_push = push_config.get("enabled", True)
self.push_group_unread_count_by_room = push_config.get(
"group_unread_count_by_room", True
)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ed15f88350..69310d9035 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -14,7 +14,6 @@
import abc
import logging
-import urllib
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
import attr
@@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
results = {}
- async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None:
+ async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None:
server_name = key_to_fetch_item.server_name
- key_ids = key_to_fetch_item.key_ids
try:
- keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
+ keys = await self.get_server_verify_keys_v2_direct(server_name)
results[server_name] = keys
except KeyLookupError as e:
- logger.warning(
- "Error looking up keys %s from %s: %s", key_ids, server_name, e
- )
+ logger.warning("Error looking up keys from %s: %s", server_name, e)
except Exception:
- logger.exception("Error getting keys %s from %s", key_ids, server_name)
+ logger.exception("Error getting keys from %s", server_name)
- await yieldable_gather_results(get_key, keys_to_fetch)
+ await yieldable_gather_results(get_keys, keys_to_fetch)
return results
- async def get_server_verify_key_v2_direct(
- self, server_name: str, key_ids: Iterable[str]
+ async def get_server_verify_keys_v2_direct(
+ self, server_name: str
) -> Dict[str, FetchKeyResult]:
"""
Args:
- server_name:
- key_ids:
+ server_name: Server to request keys from
Returns:
Map from key ID to lookup result
@@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
Raises:
KeyLookupError if there was a problem making the lookup
"""
- keys: Dict[str, FetchKeyResult] = {}
-
- for requested_key_id in key_ids:
- # we may have found this key as a side-effect of asking for another.
- if requested_key_id in keys:
- continue
-
- time_now_ms = self.clock.time_msec()
- try:
- response = await self.client.get_json(
- destination=server_name,
- path="/_matrix/key/v2/server/"
- + urllib.parse.quote(requested_key_id, safe=""),
- ignore_backoff=True,
- # we only give the remote server 10s to respond. It should be an
- # easy request to handle, so if it doesn't reply within 10s, it's
- # probably not going to.
- #
- # Furthermore, when we are acting as a notary server, we cannot
- # wait all day for all of the origin servers, as the requesting
- # server will otherwise time out before we can respond.
- #
- # (Note that get_json may make 4 attempts, so this can still take
- # almost 45 seconds to fetch the headers, plus up to another 60s to
- # read the response).
- timeout=10000,
- )
- except (NotRetryingDestination, RequestSendFailed) as e:
- # these both have str() representations which we can't really improve
- # upon
- raise KeyLookupError(str(e))
- except HttpResponseException as e:
- raise KeyLookupError("Remote server returned an error: %s" % (e,))
-
- assert isinstance(response, dict)
- if response["server_name"] != server_name:
- raise KeyLookupError(
- "Expected a response for server %r not %r"
- % (server_name, response["server_name"])
- )
-
- response_keys = await self.process_v2_response(
- from_server=server_name,
- response_json=response,
- time_added_ms=time_now_ms,
+ time_now_ms = self.clock.time_msec()
+ try:
+ response = await self.client.get_json(
+ destination=server_name,
+ path="/_matrix/key/v2/server",
+ ignore_backoff=True,
+ # we only give the remote server 10s to respond. It should be an
+ # easy request to handle, so if it doesn't reply within 10s, it's
+ # probably not going to.
+ #
+ # Furthermore, when we are acting as a notary server, we cannot
+ # wait all day for all of the origin servers, as the requesting
+ # server will otherwise time out before we can respond.
+ #
+ # (Note that get_json may make 4 attempts, so this can still take
+ # almost 45 seconds to fetch the headers, plus up to another 60s to
+ # read the response).
+ timeout=10000,
)
- await self.store.store_server_verify_keys(
- server_name,
- time_now_ms,
- ((server_name, key_id, key) for key_id, key in response_keys.items()),
+ except (NotRetryingDestination, RequestSendFailed) as e:
+ # these both have str() representations which we can't really improve
+ # upon
+ raise KeyLookupError(str(e))
+ except HttpResponseException as e:
+ raise KeyLookupError("Remote server returned an error: %s" % (e,))
+
+ assert isinstance(response, dict)
+ if response["server_name"] != server_name:
+ raise KeyLookupError(
+ "Expected a response for server %r not %r"
+ % (server_name, response["server_name"])
)
- keys.update(response_keys)
- return keys
+ return await self.process_v2_response(
+ from_server=server_name,
+ response_json=response,
+ time_added_ms=time_now_ms,
+ )
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 8bccc9c60d..137cfb3346 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -771,17 +771,28 @@ class FederationClient(FederationBase):
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
- # There is no good way to detect an "unknown" endpoint.
+ # MSC3743 specifies that servers should return a 404 or 405 with an errcode
+ # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
+ # to an unknown method, respectively.
#
- # Dendrite returns a 404 (with a body of "404 page not found");
- # Conduit returns a 404 (with no body); and Synapse returns a 400
- # with M_UNRECOGNIZED.
- #
- # This needs to be rather specific as some endpoints truly do return 404
- # errors.
+ # Older versions of servers don't properly handle this. This needs to be
+ # rather specific as some endpoints truly do return 404 errors.
return (
- e.code == 404 and (not e.response or e.response == b"404 page not found")
- ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
+ # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
+ (e.code == 404 or e.code == 405)
+ and (
+ # Older Dendrites returned a text or empty body.
+ # Older Conduit returned an empty body.
+ not e.response
+ or e.response == b"404 page not found"
+ # The proper response JSON with M_UNRECOGNIZED errcode.
+ or synapse_error.errcode == Codes.UNRECOGNIZED
+ )
+ ) or (
+ # Older Synapses returned a 400 error.
+ e.code == 400
+ and synapse_error.errcode == Codes.UNRECOGNIZED
+ )
async def _try_destination_list(
self,
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+ domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 5af2784f1e..ffc9d95ee7 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -641,7 +641,7 @@ class PerDestinationQueue:
if not message_id:
continue
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
edus = [
Edu(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 66f5b8d108..5d1d21cdc8 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -578,9 +578,6 @@ class ApplicationServicesHandler:
device_id,
), messages in recipient_device_to_messages.items():
for message_json in messages:
- # Remove 'message_id' from the to-device message, as it's an internal ID
- message_json.pop("message_id", None)
-
message_payload.append(
{
"to_user_id": user_id,
@@ -615,8 +612,8 @@ class ApplicationServicesHandler:
)
# Fetch the users who have modified their device list since then.
- users_with_changed_device_lists = (
- await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
+ users_with_changed_device_lists = await self.store.get_all_devices_changed(
+ from_key, to_key=new_key
)
# Filter out any users the application service is not interested in
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b1e55e1b9e..d4750a32e6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
- # TODO(faster joins): this fetches and processes a bunch of data that we don't
+ # TODO(faster_joins): this fetches and processes a bunch of data that we don't
# use. Could be replaced by a tighter query e.g.
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
partial_rooms = await self.store.get_partial_state_room_resync_info()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..75e89850f5 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
-from synapse.api.constants import EduTypes, ToDeviceEventTypes
+from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
@@ -216,14 +216,24 @@ class DeviceMessageHandler:
"""
sender_user_id = requester.user.to_string()
- message_id = random_string(16)
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
-
- log_kv({"number_of_to_device_messages": len(messages)})
- set_tag("sender", sender_user_id)
+ set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
+ set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
+ # add an opentracing log entry for each message
+ for device_id, message_content in by_device.items():
+ log_kv(
+ {
+ "event": "send_to_device_message",
+ "user_id": user_id,
+ "device_id": device_id,
+ EventContentFields.TO_DEVICE_MSGID: message_content.get(
+ EventContentFields.TO_DEVICE_MSGID
+ ),
+ }
+ )
+
# Ratelimit local cross-user key requests by the sending device.
if (
message_type == ToDeviceEventTypes.RoomKeyRequest
@@ -233,6 +243,7 @@ class DeviceMessageHandler:
requester, (sender_user_id, requester.device_id)
)
if not allowed:
+ log_kv({"message": f"dropping key requests to {user_id}"})
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
@@ -247,18 +258,11 @@ class DeviceMessageHandler:
"content": message_content,
"type": message_type,
"sender": sender_user_id,
- "message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
- log_kv(
- {
- "user_id": user_id,
- "device_id": list(messages_by_device),
- }
- )
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
@@ -267,7 +271,11 @@ class DeviceMessageHandler:
remote_edu_contents = {}
for destination, messages in remote_messages.items():
- log_kv({"destination": destination})
+ # The EDU contains a "message_id" property which is used for
+ # idempotence. Make up a random one.
+ message_id = random_string(16)
+ log_kv({"destination": destination, "message_id": message_id})
+
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d92582fd5c..3398fcaf7d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -152,6 +152,7 @@ class FederationHandler:
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
+ self._notifier = hs.get_notifier()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -1692,6 +1693,9 @@ class FederationHandler:
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
+ # Poke the notifier so that other workers see the write to
+ # the un-partial-stated rooms stream.
+ self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf08737d11..2af90b25a3 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
if from_key is not None:
# First get all users that have had a presence update
- updated_users = stream_change_cache.get_all_entities_changed(from_key)
+ result = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
- if updated_users is not None:
+ if result.hit:
+ updated_users = result.entities
+
# If we have the full list of changes for presence we can
# simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
@@ -1764,14 +1766,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
Returns:
A list of presence states for the given user to receive.
"""
+ updated_users = None
if from_key:
# Only return updates since the last sync
- updated_users = self.store.presence_stream_cache.get_all_entities_changed(
- from_key
- )
- if not updated_users:
- updated_users = []
+ result = self.store.presence_stream_cache.get_all_entities_changed(from_key)
+ if result.hit:
+ updated_users = result.entities
+ if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c8858b22dd..dace9b606f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -31,14 +31,20 @@ from typing import (
import attr
from prometheus_client import Counter
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ trace,
+)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
@@ -1528,10 +1534,12 @@ class SyncHandler:
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
- changed_users = self.store.get_cached_device_list_changes(
+ cache_result = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
- if changed_users is not None:
+ if cache_result.hit:
+ changed_users = cache_result.entities
+
result = await self.store.get_rooms_for_users(changed_users)
for changed_user_id, entries in result.items():
@@ -1584,6 +1592,7 @@ class SyncHandler:
else:
return DeviceListUpdates()
+ @trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
@@ -1603,11 +1612,16 @@ class SyncHandler:
)
for message in messages:
- # We pop here as we shouldn't be sending the message ID down
- # `/sync`
- message_id = message.pop("message_id", None)
- if message_id:
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ log_kv(
+ {
+ "event": "to_device_message",
+ "sender": message["sender"],
+ "type": message["type"],
+ EventContentFields.TO_DEVICE_MSGID: message["content"].get(
+ EventContentFields.TO_DEVICE_MSGID
+ ),
+ }
+ )
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a0ea719430..3f656ea4f5 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler):
if last_id == current_id:
return [], current_id, False
- changed_rooms: Optional[
- Iterable[str]
- ] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
+ result = self._typing_stream_change_cache.get_all_entities_changed(last_id)
- if changed_rooms is None:
+ if result.hit:
+ changed_rooms: Iterable[str] = result.entities
+ else:
changed_rooms = self._room_serials
rows = []
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index b69060854f..a705af8356 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -292,8 +292,15 @@ logger = logging.getLogger(__name__)
class SynapseTags:
- # The message ID of any to_device message processed
- TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+ # The message ID of any to_device EDU processed
+ TO_DEVICE_EDU_ID = "to_device.edu_id"
+
+ # Details about to-device messages
+ TO_DEVICE_TYPE = "to_device.type"
+ TO_DEVICE_SENDER = "to_device.sender"
+ TO_DEVICE_RECIPIENT = "to_device.recipient"
+ TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
+ TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID
# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index d6b377860f..9ed35d8461 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -106,6 +106,7 @@ class BulkPushRuleEvaluator:
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self._event_auth_handler = hs.get_event_auth_handler()
+ self.should_calculate_push_rules = self.hs.config.push.enable_push
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
@@ -269,6 +270,8 @@ class BulkPushRuleEvaluator:
for each event, check if the message should increment the unread count, and
insert the results into the event_push_actions_staging table.
"""
+ if not self.should_calculate_push_rules:
+ return
# For batched events the power level events may not have been persisted yet,
# so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index edeba27a45..7ee07e4bee 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,7 +17,6 @@ from synapse.events import EventBase
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
-from synapse.util.async_helpers import concurrently_execute
async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
badge = len(invites)
- room_notifs = []
-
- async def get_room_unread_count(room_id: str) -> None:
- room_notifs.append(
- await store.get_unread_event_push_actions_by_room_for_user(
- room_id,
- user_id,
- )
- )
-
- await concurrently_execute(get_room_unread_count, joins, 10)
-
- for notifs in room_notifs:
- # Combine the counts from all the threads.
- notify_count = notifs.main_timeline.notify_count + sum(
- n.notify_count for n in notifs.threads.values()
- )
+ room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
+ for room_id, notify_count in room_to_count.items():
+ # room_to_count may include rooms which the user has left,
+ # ignore those.
+ if room_id not in joins:
+ continue
if notify_count == 0:
continue
@@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
# return one badge count per conversation
badge += 1
else:
- # increment the badge count by the number of unread messages in the room
+ # Increase badge by number of notifications in room
+ # NOTE: this includes threaded and unthreaded notifications.
badge += notify_count
+
return badge
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
STREAMS_MAP = {
stream.NAME: stream
@@ -61,6 +62,7 @@ STREAMS_MAP = {
TagAccountDataStream,
AccountDataStream,
UserSignatureStream,
+ UnPartialStatedRoomStream,
)
}
@@ -80,4 +82,5 @@ __all__ = [
"TagAccountDataStream",
"AccountDataStream",
"UserSignatureStream",
+ "UnPartialStatedRoomStream",
]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
new file mode 100644
index 0000000000..18f087ffa2
--- /dev/null
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+import attr
+
+from synapse.replication.tcp.streams import Stream
+from synapse.replication.tcp.streams._base import current_token_without_instance
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedRoomStreamRow:
+ # ID of the room that has been un-partial-stated.
+ room_id: str
+
+
+class UnPartialStatedRoomStream(Stream):
+ """
+ Stream to notify about rooms becoming un-partial-stated;
+ that is, when the background sync finishes such that we now have full state for
+ the room.
+ """
+
+ NAME = "un_partial_stated_room"
+ ROW_TYPE = UnPartialStatedRoomStreamRow
+
+ def __init__(self, hs: "HomeServer"):
+ store = hs.get_datastores().main
+ super().__init__(
+ hs.get_instance_name(),
+ # TODO(faster_joins, multiple writers): we need to account for instance names
+ current_token_without_instance(store.get_un_partial_stated_rooms_token),
+ store.get_un_partial_stated_rooms_from_stream,
+ )
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 28542cd774..14c4e6ebbb 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -29,7 +29,7 @@ from synapse.rest.client import (
initial_sync,
keys,
knock,
- login as v1_login,
+ login,
login_token_request,
logout,
mutual_rooms,
@@ -82,6 +82,10 @@ class ClientRestResource(JsonResource):
@staticmethod
def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
+ # Some servlets are only registered on the main process (and not worker
+ # processes).
+ is_main_process = hs.config.worker.worker_app is None
+
versions.register_servlets(hs, client_resource)
# Deprecated in r0
@@ -92,45 +96,58 @@ class ClientRestResource(JsonResource):
events.register_servlets(hs, client_resource)
room.register_servlets(hs, client_resource)
- v1_login.register_servlets(hs, client_resource)
+ login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
- directory.register_servlets(hs, client_resource)
+ if is_main_process:
+ directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
- pusher.register_servlets(hs, client_resource)
+ if is_main_process:
+ pusher.register_servlets(hs, client_resource)
push_rule.register_servlets(hs, client_resource)
- logout.register_servlets(hs, client_resource)
+ if is_main_process:
+ logout.register_servlets(hs, client_resource)
sync.register_servlets(hs, client_resource)
- filter.register_servlets(hs, client_resource)
+ if is_main_process:
+ filter.register_servlets(hs, client_resource)
account.register_servlets(hs, client_resource)
register.register_servlets(hs, client_resource)
- auth.register_servlets(hs, client_resource)
+ if is_main_process:
+ auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
room_keys.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
- tokenrefresh.register_servlets(hs, client_resource)
+ if is_main_process:
+ tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
account_data.register_servlets(hs, client_resource)
- report_event.register_servlets(hs, client_resource)
- openid.register_servlets(hs, client_resource)
- notifications.register_servlets(hs, client_resource)
+ if is_main_process:
+ report_event.register_servlets(hs, client_resource)
+ openid.register_servlets(hs, client_resource)
+ notifications.register_servlets(hs, client_resource)
devices.register_servlets(hs, client_resource)
- thirdparty.register_servlets(hs, client_resource)
+ if is_main_process:
+ thirdparty.register_servlets(hs, client_resource)
sendtodevice.register_servlets(hs, client_resource)
user_directory.register_servlets(hs, client_resource)
- room_upgrade_rest_servlet.register_servlets(hs, client_resource)
+ if is_main_process:
+ room_upgrade_rest_servlet.register_servlets(hs, client_resource)
room_batch.register_servlets(hs, client_resource)
- capabilities.register_servlets(hs, client_resource)
- account_validity.register_servlets(hs, client_resource)
+ if is_main_process:
+ capabilities.register_servlets(hs, client_resource)
+ account_validity.register_servlets(hs, client_resource)
relations.register_servlets(hs, client_resource)
- password_policy.register_servlets(hs, client_resource)
- knock.register_servlets(hs, client_resource)
+ if is_main_process:
+ password_policy.register_servlets(hs, client_resource)
+ knock.register_servlets(hs, client_resource)
# moving to /_synapse/admin
- admin.register_servlets_for_client_rest_resource(hs, client_resource)
+ if is_main_process:
+ admin.register_servlets_for_client_rest_resource(hs, client_resource)
# unstable
- mutual_rooms.register_servlets(hs, client_resource)
- login_token_request.register_servlets(hs, client_resource)
- rendezvous.register_servlets(hs, client_resource)
+ if is_main_process:
+ mutual_rooms.register_servlets(hs, client_resource)
+ login_token_request.register_servlets(hs, client_resource)
+ rendezvous.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 44f622bcce..b4b92f0c99 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -875,19 +875,21 @@ class AccountStatusRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- EmailPasswordRequestTokenRestServlet(hs).register(http_server)
- PasswordRestServlet(hs).register(http_server)
- DeactivateAccountRestServlet(hs).register(http_server)
- EmailThreepidRequestTokenRestServlet(hs).register(http_server)
- MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
- AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
- AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+ PasswordRestServlet(hs).register(http_server)
+ DeactivateAccountRestServlet(hs).register(http_server)
+ EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+ MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
+ AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
+ AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
- ThreepidAddRestServlet(hs).register(http_server)
- ThreepidBindRestServlet(hs).register(http_server)
- ThreepidUnbindRestServlet(hs).register(http_server)
- ThreepidDeleteRestServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ ThreepidAddRestServlet(hs).register(http_server)
+ ThreepidBindRestServlet(hs).register(http_server)
+ ThreepidUnbindRestServlet(hs).register(http_server)
+ ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
- if hs.config.experimental.msc3720_enabled:
+ if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
AccountStatusRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 69b803f9f8..486c6dbbc5 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -342,8 +342,10 @@ class ClaimDehydratedDeviceServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- DeleteDevicesRestServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
- DeviceRestServlet(hs).register(http_server)
- DehydratedDeviceServlet(hs).register(http_server)
- ClaimDehydratedDeviceServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ DeviceRestServlet(hs).register(http_server)
+ DehydratedDeviceServlet(hs).register(http_server)
+ ClaimDehydratedDeviceServlet(hs).register(http_server)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index ee038c7192..7873b363c0 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -376,5 +376,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
KeyQueryServlet(hs).register(http_server)
KeyChangesServlet(hs).register(http_server)
OneTimeKeyServlet(hs).register(http_server)
- SigningKeyUploadServlet(hs).register(http_server)
- SignaturesUploadServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ SigningKeyUploadServlet(hs).register(http_server)
+ SignaturesUploadServlet(hs).register(http_server)
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index de810ae3ec..3cb1e7e375 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -949,9 +949,10 @@ def _calculate_registration_flows(
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- EmailRegisterRequestTokenRestServlet(hs).register(http_server)
- MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
- UsernameAvailabilityRestServlet(hs).register(http_server)
- RegistrationSubmitTokenServlet(hs).register(http_server)
+ if hs.config.worker.worker_app is None:
+ EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+ MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
+ UsernameAvailabilityRestServlet(hs).register(http_server)
+ RegistrationSubmitTokenServlet(hs).register(http_server)
RegistrationTokenValidityRestServlet(hs).register(http_server)
RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 636cc62877..514eb6afc8 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -396,12 +396,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
- try:
- content = parse_json_object_from_request(request)
- except Exception:
- # Turns out we used to ignore the body entirely, and some clients
- # cheekily send invalid bodies.
- content = {}
+ content = parse_json_object_from_request(request, allow_empty_body=True)
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
@@ -952,12 +947,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
}:
raise AuthError(403, "Guest access not allowed")
- try:
- content = parse_json_object_from_request(request)
- except Exception:
- # Turns out we used to ignore the body entirely, and some clients
- # cheekily send invalid bodies.
- content = {}
+ content = parse_json_object_from_request(request, allow_empty_body=True)
if membership_action == "invite" and all(
key in content for key in ("medium", "address")
@@ -1395,9 +1385,7 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet):
)
-def register_servlets(
- hs: "HomeServer", http_server: HttpServer, is_worker: bool = False
-) -> None:
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
RoomStateEventRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1421,7 +1409,7 @@ def register_servlets(
TimestampLookupRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
- if not is_worker:
+ if hs.config.worker.worker_app is None:
RoomForgetRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 46a8b03829..55d52f0b28 100644
--- a/synapse/rest/client/sendtodevice.py
+++ b/synapse/rest/client/sendtodevice.py
@@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 3c0a90010b..e19c0946c0 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet):
"v1.2",
"v1.3",
"v1.4",
+ "v1.5",
],
# as per MSC1497:
"unstable_features": {
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..48a54d9cb8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -26,8 +26,15 @@ from typing import (
cast,
)
+from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ trace,
+)
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(recipient_user_id, recipient_device_id), []
).append(message_dict)
+ # start a new span for each message, so that we can tag each separately
+ with start_active_span("get_to_device_message"):
+ set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
+ set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
+ )
+
if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
- if remote_messages_by_destination:
- issue9533_logger.debug(
- "Queued outgoing to-device messages with stream_id %i for %s",
- stream_id,
- list(remote_messages_by_destination.keys()),
- )
+ for destination, edu in remote_messages_by_destination.items():
+ if issue9533_logger.isEnabledFor(logging.DEBUG):
+ issue9533_logger.debug(
+ "Queued outgoing to-device messages with "
+ "stream_id %i, EDU message_id %s, type %s for %s: %s",
+ stream_id,
+ edu["message_id"],
+ edu["type"],
+ destination,
+ [
+ f"{user_id}/{device_id} (msgid "
+ f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
+ for (user_id, messages_by_device) in edu["messages"].items()
+ for (device_id, msg) in messages_by_device.items()
+ ],
+ )
+
+ for (user_id, messages_by_device) in edu["messages"].items():
+ for (device_id, msg) in messages_by_device.items():
+ with start_active_span("store_outgoing_to_device_message"):
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
+ set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ msg.get(EventContentFields.TO_DEVICE_MSGID),
+ )
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
@@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Only insert into the local inbox if the device exists on
# this server
device_id = row["device_id"]
- message_json = json_encoder.encode(messages_by_device[device_id])
+
+ with start_active_span("serialise_to_device_message"):
+ msg = messages_by_device[device_id]
+ set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
+ set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+ set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+ set_tag(
+ SynapseTags.TO_DEVICE_MSGID,
+ msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
+ )
+ message_json = json_encoder.encode(msg)
+
messages_json_for_user[device_id] = message_json
if messages_json_for_user:
@@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
- issue9533_logger.debug(
- "Stored to-device messages with stream_id %i for %s",
- stream_id,
- [
- (user_id, device_id)
- for (user_id, messages_by_device) in local_by_user_then_device.items()
- for device_id in messages_by_device.keys()
- ],
- )
+ if issue9533_logger.isEnabledFor(logging.DEBUG):
+ issue9533_logger.debug(
+ "Stored to-device messages with stream_id %i: %s",
+ stream_id,
+ [
+ f"{user_id}/{device_id} (msgid "
+ f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
+ for (
+ user_id,
+ messages_by_device,
+ ) in messages_by_user_then_device.items()
+ for (device_id, msg) in messages_by_device.items()
+ ],
+ )
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 534f7fc04a..a5bb4d404e 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.caches.stream_change_cache import (
+ AllEntitiesChangedResult,
+ StreamChangeCache,
+)
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -799,7 +802,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_cached_device_list_changes(
self,
from_key: int,
- ) -> Optional[List[str]]:
+ ) -> AllEntitiesChangedResult:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""
@@ -807,10 +810,58 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return self._device_list_stream_cache.get_all_entities_changed(from_key)
@cancellable
+ async def get_all_devices_changed(
+ self,
+ from_key: int,
+ to_key: int,
+ ) -> Set[str]:
+ """Get all users whose devices have changed in the given range.
+
+ Args:
+ from_key: The minimum device lists stream token to query device list
+ changes for, exclusive.
+ to_key: The maximum device lists stream token to query device list
+ changes for, inclusive.
+
+ Returns:
+ The set of user_ids whose devices have changed since `from_key`
+ (exclusive) until `to_key` (inclusive).
+ """
+
+ result = self._device_list_stream_cache.get_all_entities_changed(from_key)
+
+ if result.hit:
+ # We know which users might have changed devices.
+ if not result.entities:
+ # If no users then we can return early.
+ return set()
+
+ # Otherwise we need to filter down the list
+ return await self.get_users_whose_devices_changed(
+ from_key, result.entities, to_key
+ )
+
+ # If the cache didn't tell us anything, we just need to query the full
+ # range.
+ sql = """
+ SELECT DISTINCT user_id FROM device_lists_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ """
+
+ rows = await self.db_pool.execute(
+ "get_all_devices_changed",
+ None,
+ sql,
+ from_key,
+ to_key,
+ )
+ return {u for u, in rows}
+
+ @cancellable
async def get_users_whose_devices_changed(
self,
from_key: int,
- user_ids: Optional[Collection[str]] = None,
+ user_ids: Collection[str],
to_key: Optional[int] = None,
) -> Set[str]:
"""Get set of users whose devices have changed since `from_key` that
@@ -830,46 +881,31 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"""
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
- user_ids_to_check: Optional[Collection[str]]
- if user_ids is None:
- # Get set of all users that have had device list changes since 'from_key'
- user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
- from_key
- )
- else:
- # The same as above, but filter results to only those users in 'user_ids'
- user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
- user_ids, from_key
- )
+ user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
+ user_ids, from_key
+ )
+ # If an empty set was returned, there's nothing to do.
if not user_ids_to_check:
return set()
- def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
- changes: Set[str] = set()
-
- stream_id_where_clause = "stream_id > ?"
- sql_args = [from_key]
-
- if to_key:
- stream_id_where_clause += " AND stream_id <= ?"
- sql_args.append(to_key)
+ if to_key is None:
+ to_key = self._device_list_id_gen.get_current_token()
- sql = f"""
+ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
+ sql = """
SELECT DISTINCT user_id FROM device_lists_stream
- WHERE {stream_id_where_clause}
- AND
+ WHERE ? < stream_id AND stream_id <= ? AND %s
"""
+ changes: Set[str] = set()
+
# Query device changes with a batch of users at a time
- # Assertion for mypy's benefit; see also
- # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
- assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
- txn.execute(sql + clause, sql_args + args)
+ txn.execute(sql % (clause,), [from_key, to_key] + args)
changes.update(user_id for user_id, in txn)
return changes
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b283ab0f9c..7ebe34f773 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,6 +74,7 @@ receipt.
"""
import logging
+from collections import defaultdict
from typing import (
TYPE_CHECKING,
Collection,
@@ -95,6 +96,7 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ PostgresEngine,
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return result
+ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
+ """Get the notification count by room for a user. Only considers notifications,
+ not highlight or unread counts, and threads are currently aggregated under their room.
+
+ This function is intentionally not cached because it is called to calculate the
+ unread badge for push notifications and thus the result is expected to change.
+
+ Note that this function assumes the user is a member of the room. Because
+ summary rows are not removed when a user leaves a room, the caller must
+ filter out those results from the result.
+
+ Returns:
+ A map of room ID to notification counts for the given user.
+ """
+ return await self.db_pool.runInteraction(
+ "get_unread_counts_by_room_for_user",
+ self._get_unread_counts_by_room_for_user_txn,
+ user_id,
+ )
+
+ def _get_unread_counts_by_room_for_user_txn(
+ self, txn: LoggingTransaction, user_id: str
+ ) -> Dict[str, int]:
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+ )
+ args.extend([user_id, user_id])
+
+ receipts_cte = f"""
+ WITH all_receipts AS (
+ SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
+ FROM receipts_linearized
+ LEFT JOIN events USING (room_id, event_id)
+ WHERE
+ {receipt_types_clause}
+ AND user_id = ?
+ GROUP BY room_id, thread_id
+ )
+ """
+
+ receipts_joins = """
+ LEFT JOIN (
+ SELECT room_id, thread_id,
+ max_receipt_stream_ordering AS threaded_receipt_stream_ordering
+ FROM all_receipts
+ WHERE thread_id IS NOT NULL
+ ) AS threaded_receipts USING (room_id, thread_id)
+ LEFT JOIN (
+ SELECT room_id, thread_id,
+ max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
+ FROM all_receipts
+ WHERE thread_id IS NULL
+ ) AS unthreaded_receipts USING (room_id)
+ """
+
+ # First get summary counts by room / thread for the user. We use the max receipt
+ # stream ordering of both threaded & unthreaded receipts to compare against the
+ # summary table.
+ #
+ # PostgreSQL and SQLite differ in comparing scalar numerics.
+ if isinstance(self.database_engine, PostgresEngine):
+ # GREATEST ignores NULLs.
+ max_clause = """GREATEST(
+ threaded_receipt_stream_ordering,
+ unthreaded_receipt_stream_ordering
+ )"""
+ else:
+ # MAX returns NULL if any are NULL, so COALESCE to 0 first.
+ max_clause = """MAX(
+ COALESCE(threaded_receipt_stream_ordering, 0),
+ COALESCE(unthreaded_receipt_stream_ordering, 0)
+ )"""
+
+ sql = f"""
+ {receipts_cte}
+ SELECT eps.room_id, eps.thread_id, notif_count
+ FROM event_push_summary AS eps
+ {receipts_joins}
+ WHERE user_id = ?
+ AND notif_count != 0
+ AND (
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
+ OR last_receipt_stream_ordering = {max_clause}
+ )
+ """
+ txn.execute(sql, args)
+
+ seen_thread_ids = set()
+ room_to_count: Dict[str, int] = defaultdict(int)
+
+ for room_id, thread_id, notif_count in txn:
+ room_to_count[room_id] += notif_count
+ seen_thread_ids.add(thread_id)
+
+ # Now get any event push actions that haven't been rotated using the same OR
+ # join and filter by receipt and event push summary rotated up to stream ordering.
+ sql = f"""
+ {receipts_cte}
+ SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+ FROM event_push_actions AS epa
+ {receipts_joins}
+ WHERE user_id = ?
+ AND epa.notif = 1
+ AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+ GROUP BY epa.room_id, epa.thread_id
+ """
+ txn.execute(sql, args)
+
+ for room_id, thread_id, notif_count in txn:
+ # Note: only count push actions we have valid summaries for with up to date receipt.
+ if thread_id not in seen_thread_ids:
+ continue
+ room_to_count[room_id] += notif_count
+
+ thread_id_clause, thread_ids_args = make_in_list_sql_clause(
+ self.database_engine, "epa.thread_id", seen_thread_ids
+ )
+
+ # Finally re-check event_push_actions for any rooms not in the summary, ignoring
+ # the rotated up-to position. This handles the case where a read receipt has arrived
+ # but not been rotated meaning the summary table is out of date, so we go back to
+ # the push actions table.
+ sql = f"""
+ {receipts_cte}
+ SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+ FROM event_push_actions AS epa
+ {receipts_joins}
+ WHERE user_id = ?
+ AND NOT {thread_id_clause}
+ AND epa.notif = 1
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+ GROUP BY epa.room_id
+ """
+
+ args.extend(thread_ids_args)
+ txn.execute(sql, args)
+
+ for room_id, notif_count in txn:
+ room_to_count[room_id] += notif_count
+
+ return room_to_count
+
@cached(tree=True, max_entries=5000, iterable=True)
async def get_unread_event_push_actions_by_room_for_user(
self,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 1309bfd374..78906a5e1d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import IdGenerator
+from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
+ IdGenerator,
+ MultiWriterIdGenerator,
+ StreamIdGenerator,
+)
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config
+ self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_room_stream",
+ instance_name=self._instance_name,
+ tables=[
+ ("un_partial_stated_room_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_room_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_room_stream", "stream_id"
+ )
+
async def store_room(
self,
room_id: str,
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
- async def clear_partial_state_room(self, room_id: str) -> bool:
- """Clears the partial state flag for a room.
-
- Args:
- room_id: The room whose partial state flag is to be cleared.
-
- Returns:
- `True` if the partial state flag has been cleared successfully.
-
- `False` if the partial state flag could not be cleared because the room
- still contains events with partial state.
- """
- try:
- await self.db_pool.runInteraction(
- "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
- )
- return True
- except self.db_pool.engine.module.IntegrityError as e:
- # Assume that any `IntegrityError`s are due to partial state events.
- logger.info(
- "Exception while clearing lazy partial-state-room %s, retrying: %s",
- room_id,
- e,
- )
- return False
-
- def _clear_partial_state_room_txn(
- self, txn: LoggingTransaction, room_id: str
- ) -> None:
- DatabasePool.simple_delete_txn(
- txn,
- table="partial_state_rooms_servers",
- keyvalues={"room_id": room_id},
- )
- DatabasePool.simple_delete_one_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={"room_id": room_id},
- )
- self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
- self._invalidate_cache_and_stream(
- txn, self.get_partial_state_servers_at_join, (room_id,)
- )
-
- # We now delete anything from `device_lists_remote_pending` with a
- # stream ID less than the minimum
- # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
- device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={},
- retcol="MIN(device_lists_stream_id)",
- allow_none=True,
- )
- if device_lists_stream_id is None:
- # There are no rooms being currently partially joined, so we delete everything.
- txn.execute("DELETE FROM device_lists_remote_pending")
- else:
- sql = """
- DELETE FROM device_lists_remote_pending
- WHERE stream_id <= ?
- """
- txn.execute(sql, (device_lists_stream_id,))
-
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return result["join_event_id"], result["device_lists_stream_id"]
+ def get_un_partial_stated_rooms_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there
+ # are multiple writers because workers that don't write often will
+ # hold all readers up.
+ # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
+ # explanation.)
+ return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_rooms_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ """Get updates for caches replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_rooms_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ sql = """
+ SELECT stream_id, room_id
+ FROM un_partial_stated_room_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [(row[0], (row[1],)) for row in txn]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_rooms_from_stream",
+ get_un_partial_stated_rooms_from_stream_txn,
+ )
+
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
+ self._instance_name = hs.get_instance_name()
+
async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
) -> None:
@@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self.is_room_blocked,
(room_id,),
)
+
+ async def clear_partial_state_room(self, room_id: str) -> bool:
+ """Clears the partial state flag for a room.
+
+ Args:
+ room_id: The room whose partial state flag is to be cleared.
+
+ Returns:
+ `True` if the partial state flag has been cleared successfully.
+
+ `False` if the partial state flag could not be cleared because the room
+ still contains events with partial state.
+ """
+ try:
+ async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+ await self.db_pool.runInteraction(
+ "clear_partial_state_room",
+ self._clear_partial_state_room_txn,
+ room_id,
+ un_partial_state_room_stream_id,
+ )
+ return True
+ except self.db_pool.engine.module.IntegrityError as e:
+ # Assume that any `IntegrityError`s are due to partial state events.
+ logger.info(
+ "Exception while clearing lazy partial-state-room %s, retrying: %s",
+ room_id,
+ e,
+ )
+ return False
+
+ def _clear_partial_state_room_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ un_partial_state_room_stream_id: int,
+ ) -> None:
+ DatabasePool.simple_delete_txn(
+ txn,
+ table="partial_state_rooms_servers",
+ keyvalues={"room_id": room_id},
+ )
+ DatabasePool.simple_delete_one_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={"room_id": room_id},
+ )
+ self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_partial_state_servers_at_join, (room_id,)
+ )
+
+ DatabasePool.simple_insert_txn(
+ txn,
+ "un_partial_stated_room_stream",
+ {
+ "stream_id": un_partial_state_room_stream_id,
+ "instance_name": self._instance_name,
+ "room_id": room_id,
+ },
+ )
+
+ # We now delete anything from `device_lists_remote_pending` with a
+ # stream ID less than the minimum
+ # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+ device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={},
+ retcol="MIN(device_lists_stream_id)",
+ allow_none=True,
+ )
+ if device_lists_stream_id is None:
+ # There are no rooms being currently partially joined, so we delete everything.
+ txn.execute("DELETE FROM device_lists_remote_pending")
+ else:
+ sql = """
+ DELETE FROM device_lists_remote_pending
+ WHERE stream_id <= ?
+ """
+ txn.execute(sql, (device_lists_stream_id,))
diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
new file mode 100644
index 0000000000..743196cfe3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that a room has become un-partial-stated.
+CREATE TABLE un_partial_stated_room_stream(
+ -- Position in the stream
+ stream_id BIGINT PRIMARY KEY NOT NULL,
+
+ -- Which instance wrote this entry.
+ instance_name TEXT NOT NULL,
+
+ -- Which room has been un-partial-stated.
+ room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting a room, the database needs to be able to check here.
+-- This index is not unique because we can join a room multiple times in a server's lifetime,
+-- so the same room could be un-partial-stated multiple times!
+CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..c1aac0b385
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence;
+
+SELECT setval('un_partial_stated_room_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream
+));
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 666f4b6895..c8b17acb59 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -16,6 +16,7 @@ import logging
import math
from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union
+import attr
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -26,14 +27,41 @@ logger = logging.getLogger(__name__)
EntityType = str
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class AllEntitiesChangedResult:
+ """Return type of `get_all_entities_changed`.
+
+ Callers must check that there was a cache hit, via `result.hit`, before
+ using the entities in `result.entities`.
+
+ This specifically does *not* implement helpers such as `__bool__` to ensure
+ that callers do the correct checks.
+ """
+
+ _entities: Optional[List[EntityType]]
+
+ @property
+ def hit(self) -> bool:
+ return self._entities is not None
+
+ @property
+ def entities(self) -> List[EntityType]:
+ assert self._entities is not None
+ return self._entities
+
+
class StreamChangeCache:
- """Keeps track of the stream positions of the latest change in a set of entities.
+ """
+ Keeps track of the stream positions of the latest change in a set of entities.
- Typically the entity will be a room or user id.
+ The entity will is typically a room ID or user ID, but can be any string.
- Given a list of entities and a stream position, it will give a subset of
- entities that may have changed since that position. If position key is too
- old then the cache will simply return all given entities.
+ Can be queried for whether a specific entity has changed after a stream position
+ or for a list of changed entities after a stream position. See the individual
+ methods for more information.
+
+ Only tracks to a maximum cache size, any position earlier than the earliest
+ known stream position must be treated as unknown.
"""
def __init__(
@@ -45,16 +73,20 @@ class StreamChangeCache:
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
- self._entity_to_key: Dict[EntityType, int] = {}
- # map from stream id to the a set of entities which changed at that stream id.
+ # map from stream id to the set of entities which changed at that stream id.
self._cache: SortedDict[int, Set[EntityType]] = SortedDict()
+ # map from entity to the stream ID of the latest change for that entity.
+ #
+ # Must be kept in sync with _cache.
+ self._entity_to_key: Dict[EntityType, int] = {}
# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.
#
self._earliest_known_stream_pos = current_stream_pos
+
self.name = name
self.metrics = caches.register_cache(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
@@ -82,22 +114,46 @@ class StreamChangeCache:
return False
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
- """Returns True if the entity may have been updated since stream_pos"""
+ """
+ Returns True if the entity may have been updated after stream_pos.
+
+ Args:
+ entity: The entity to check for changes.
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ True if the entity may have been updated, this happens if:
+ * The given stream position is at or earlier than the earliest
+ known stream position.
+ * The given stream position is earlier than the latest change for
+ the entity.
+
+ False otherwise:
+ * The entity is unknown.
+ * The given stream position is at or later than the latest change
+ for the entity.
+ """
assert isinstance(stream_pos, int)
- if stream_pos < self._earliest_known_stream_pos:
+ # _cache is not valid at or before the earliest known stream position, so
+ # return that the entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
+ # If the entity is unknown, it hasn't changed.
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
self.metrics.inc_hits()
return False
+ # This is a known entity, return true if the stream position is earlier
+ # than the last change.
if stream_pos < latest_entity_change_pos:
self.metrics.inc_misses()
return True
+ # Otherwise, the stream position is after the latest change: return false.
self.metrics.inc_hits()
return False
@@ -105,23 +161,35 @@ class StreamChangeCache:
self, entities: Collection[EntityType], stream_pos: int
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
- Returns subset of entities that have had new things since the given
- position. Entities unknown to the cache will be returned. If the
- position is too old it will just return the given list.
+ Returns the subset of the given entities that have had changes after the given position.
+
+ Entities unknown to the cache will be returned.
+
+ If the position is too old it will just return the given list.
+
+ Args:
+ entities: Entities to check for changes.
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ A subset of entities which have changed after the given stream position.
+
+ This will be all entities if the given stream position is at or earlier
+ than the earliest known stream position.
"""
- changed_entities = self.get_all_entities_changed(stream_pos)
- if changed_entities is not None:
+ cache_result = self.get_all_entities_changed(stream_pos)
+ if cache_result.hit:
# We now do an intersection, trying to do so in the most efficient
# way possible (some of these sets are *large*). First check in the
- # given iterable is already set that we can reuse, otherwise we
+ # given iterable is already a set that we can reuse, otherwise we
# create a set of the *smallest* of the two iterables and call
# `intersection(..)` on it (this can be twice as fast as the reverse).
if isinstance(entities, (set, frozenset)):
- result = entities.intersection(changed_entities)
- elif len(changed_entities) < len(entities):
- result = set(changed_entities).intersection(entities)
+ result = entities.intersection(cache_result.entities)
+ elif len(cache_result.entities) < len(entities):
+ result = set(cache_result.entities).intersection(entities)
else:
- result = set(entities).intersection(changed_entities)
+ result = set(entities).intersection(cache_result.entities)
self.metrics.inc_hits()
else:
result = set(entities)
@@ -130,43 +198,75 @@ class StreamChangeCache:
return result
def has_any_entity_changed(self, stream_pos: int) -> bool:
- """Returns if any entity has changed"""
- assert type(stream_pos) is int
+ """
+ Returns true if any entity has changed after the given stream position.
+
+ Args:
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ True if any entity has changed after the given stream position or
+ if the given stream position is at or earlier than the earliest
+ known stream position.
+
+ False otherwise.
+ """
+ assert isinstance(stream_pos, int)
if not self._cache:
# If the cache is empty, nothing can have changed.
return False
- if stream_pos >= self._earliest_known_stream_pos:
- self.metrics.inc_hits()
- return self._cache.bisect_right(stream_pos) < len(self._cache)
- else:
+ # _cache is not valid at or before the earliest known stream position, so
+ # return that an entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
- def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
- """Returns all entities that have had new things since the given
- position. If the position is too old it will return None.
+ self.metrics.inc_hits()
+ return stream_pos < self._cache.peekitem()[0]
+
+ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
+ """
+ Returns all entities that have had changes after the given position.
+
+ If the stream change cache does not go far enough back, i.e. the
+ position is too old, it will return None.
Returns the entities in the order that they were changed.
+
+ Args:
+ stream_pos: The stream position to check for changes after.
+
+ Return:
+ A class indicating if we have the requested data cached, and if so
+ includes the entities in the order they were changed.
"""
- assert type(stream_pos) is int
+ assert isinstance(stream_pos, int)
- if stream_pos < self._earliest_known_stream_pos:
- return None
+ # _cache is not valid at or before the earliest known stream position, so
+ # return None to mark that it is unknown if an entity has changed.
+ if stream_pos <= self._earliest_known_stream_pos:
+ return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
- return changed_entities
+ return AllEntitiesChangedResult(changed_entities)
def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
- """Informs the cache that the entity has been changed at the given
- position.
"""
- assert type(stream_pos) is int
+ Informs the cache that the entity has been changed at the given position.
+
+ Args:
+ entity: The entity to mark as changed.
+ stream_pos: The stream position to update the entity to.
+ """
+ assert isinstance(stream_pos, int)
+ # For a change before _cache is valid (e.g. at or before the earliest known
+ # stream position) there's nothing to do.
if stream_pos <= self._earliest_known_stream_pos:
return
@@ -189,6 +289,11 @@ class StreamChangeCache:
self._evict()
def _evict(self) -> None:
+ """
+ Ensure the cache has not exceeded the maximum size.
+
+ Evicts entries until it is at the maximum size.
+ """
# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
@@ -199,5 +304,12 @@ class StreamChangeCache:
def get_max_pos_of_last_change(self, entity: EntityType) -> int:
"""Returns an upper bound of the stream id of the last change to an
entity.
+
+ Args:
+ entity: The entity to check.
+
+ Return:
+ The stream position of the latest change for the given entity or
+ the earliest known stream position if the entitiy is unknown.
"""
return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
|