diff --git a/synapse/__init__.py b/synapse/__init__.py
index 3cd1ce6070..6369f18a53 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
-__version__ = "1.48.0"
+__version__ = "1.49.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 44883c6663..0bb4b77f84 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -155,7 +155,11 @@ class Auth:
access_token = self.get_access_token_from_request(request)
- user_id, app_service = await self._get_appservice_user_id(request)
+ (
+ user_id,
+ device_id,
+ app_service,
+ ) = await self._get_appservice_user_id_and_device_id(request)
if user_id and app_service:
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
@@ -163,16 +167,22 @@ class Auth:
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
- device_id="dummy-device", # stubbed
+ device_id="dummy-device"
+ if device_id is None
+ else device_id, # stubbed
)
- requester = create_requester(user_id, app_service=app_service)
+ requester = create_requester(
+ user_id, app_service=app_service, device_id=device_id
+ )
request.requester = user_id
if user_id in self._force_tracing_for_users:
opentracing.force_tracing()
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("user_id", user_id)
+ if device_id is not None:
+ opentracing.set_tag("device_id", device_id)
opentracing.set_tag("appservice_id", app_service.id)
return requester
@@ -274,33 +284,81 @@ class Auth:
403, "Application service has not registered this user (%s)" % user_id
)
- async def _get_appservice_user_id(
+ async def _get_appservice_user_id_and_device_id(
self, request: Request
- ) -> Tuple[Optional[str], Optional[ApplicationService]]:
+ ) -> Tuple[Optional[str], Optional[str], Optional[ApplicationService]]:
+ """
+ Given a request, reads the request parameters to determine:
+ - whether it's an application service that's making this request
+ - what user the application service should be treated as controlling
+ (the user_id URI parameter allows an application service to masquerade
+ any applicable user in its namespace)
+ - what device the application service should be treated as controlling
+ (the device_id[^1] URI parameter allows an application service to masquerade
+ as any device that exists for the relevant user)
+
+ [^1] Unstable and provided by MSC3202.
+ Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
+
+ Returns:
+ 3-tuple of
+ (user ID?, device ID?, application service?)
+
+ Postconditions:
+ - If an application service is returned, so is a user ID
+ - A user ID is never returned without an application service
+ - A device ID is never returned without a user ID or an application service
+ - The returned application service, if present, is permitted to control the
+ returned user ID.
+ - The returned device ID, if present, has been checked to be a valid device ID
+ for the returned user ID.
+ """
+ DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
+
app_service = self.store.get_app_service_by_token(
self.get_access_token_from_request(request)
)
if app_service is None:
- return None, None
+ return None, None, None
if app_service.ip_range_whitelist:
ip_address = IPAddress(request.getClientIP())
if ip_address not in app_service.ip_range_whitelist:
- return None, None
+ return None, None, None
# This will always be set by the time Twisted calls us.
assert request.args is not None
- if b"user_id" not in request.args:
- return app_service.sender, app_service
+ if b"user_id" in request.args:
+ effective_user_id = request.args[b"user_id"][0].decode("utf8")
+ await self.validate_appservice_can_control_user_id(
+ app_service, effective_user_id
+ )
+ else:
+ effective_user_id = app_service.sender
- user_id = request.args[b"user_id"][0].decode("utf8")
- await self.validate_appservice_can_control_user_id(app_service, user_id)
+ effective_device_id: Optional[str] = None
- if app_service.sender == user_id:
- return app_service.sender, app_service
+ if (
+ self.hs.config.experimental.msc3202_device_masquerading_enabled
+ and DEVICE_ID_ARG_NAME in request.args
+ ):
+ effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
+ # We only just set this so it can't be None!
+ assert effective_device_id is not None
+ device_opt = await self.store.get_device_opt(
+ effective_user_id, effective_device_id
+ )
+ if device_opt is None:
+ # For now, use 400 M_EXCLUSIVE if the device doesn't exist.
+ # This is an open thread of discussion on MSC3202 as of 2021-12-09.
+ raise AuthError(
+ 400,
+ f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
+ Codes.EXCLUSIVE,
+ )
- return user_id, app_service
+ return effective_user_id, effective_device_id, app_service
async def get_user_by_access_token(
self,
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 1d02f01759..d19165e5b4 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -61,3 +61,8 @@ class ExperimentalConfig(Config):
# MSC3202 (device list updates and OTK counts / fallback keys to appservices).
# Only device lists are supported currently.
self.msc3202_enabled: bool = experimental.get("msc3202_enabled", False)
+
+ # The portion of MSC3202 which is related to device masquerading.
+ self.msc3202_device_masquerading_enabled: bool = experimental.get(
+ "msc3202_device_masquerading", False
+ )
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 3c5e0f7ce7..57316c59b6 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -15,9 +15,8 @@
from typing import List
-from matrix_common.regex import glob_to_regex
-
from synapse.types import JsonDict
+from synapse.util import glob_to_regex
from ._base import Config, ConfigError
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 3e235b57a7..4ca111618f 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -16,12 +16,11 @@ import logging
import os
from typing import List, Optional, Pattern
-from matrix_common.regex import glob_to_regex
-
from OpenSSL import SSL, crypto
from twisted.internet._sslverify import Certificate, trustRootFromCertificates
from synapse.config._base import Config, ConfigError
+from synapse.util import glob_to_regex
logger = logging.getLogger(__name__)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4697a62c18..8e37e76206 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -28,7 +28,6 @@ from typing import (
Union,
)
-from matrix_common.regex import glob_to_regex
from prometheus_client import Counter, Gauge, Histogram
from twisted.internet import defer
@@ -67,7 +66,7 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict, get_domain_from_id
-from synapse.util import json_decoder, unwrapFirstError
+from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_server_name
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 60c11e3d21..b2554bda04 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,8 +65,12 @@ class E2eKeysHandler:
else:
# Only register this edu handler on master as it requires writing
# device updates to the db
- #
- # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+ federation_registry.register_edu_handler(
+ "m.signing_key_update",
+ self._edu_updater.incoming_signing_key_update,
+ )
+ # also handle the unstable version
+ # FIXME: remove this when enough servers have upgraded
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index cd64142735..4f42438053 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -406,9 +406,6 @@ class PaginationHandler:
force: set true to skip checking for joined users.
"""
with await self.pagination_lock.write(room_id):
- # check we know about the room
- await self.store.get_room_version_id(room_id)
-
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3df872c578..454d06c973 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -421,7 +421,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._on_shutdown,
)
- def _on_shutdown(self) -> None:
+ async def _on_shutdown(self) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2bcdf32dcc..ead2198e14 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1535,20 +1535,13 @@ class RoomShutdownHandler:
await self.store.block_room(room_id, requester_user_id)
if not await self.store.get_room(room_id):
- if block:
- # We allow you to block an unknown room.
- return {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
- else:
- # But if you don't want to preventatively block another room,
- # this function can't do anything useful.
- raise NotFoundError(
- "Cannot shut down room: unknown room id %s" % (room_id,)
- )
+ # if we don't know about the room, there is nothing left to do.
+ return {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ebe6fac388..d004c42885 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1482,16 +1482,22 @@ class SyncHandler:
account_data_by_room: Dictionary of per room account data
Returns:
- Returns a 4-tuple whose entries are:
+ Returns a 4-tuple describing rooms the user has joined or left, and users who've
+ joined or left rooms any rooms the user is in. This gets used later in
+ `_generate_sync_entry_for_device_list`.
+
+ Its entries are:
- newly_joined_rooms
- newly_joined_or_invited_or_knocked_users
- newly_left_rooms
- newly_left_users
"""
- # Start by fetching all ephemeral events in rooms we've joined (if required).
+ since_token = sync_result_builder.since_token
+
+ # 1. Start by fetching all ephemeral events in rooms we've joined (if required).
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
- sync_result_builder.since_token is None
+ since_token is None
and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
@@ -1505,9 +1511,8 @@ class SyncHandler:
)
sync_result_builder.now_token = now_token
- # We check up front if anything has changed, if it hasn't then there is
+ # 2. We check up front if anything has changed, if it hasn't then there is
# no point in going further.
- since_token = sync_result_builder.since_token
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
@@ -1520,20 +1525,8 @@ class SyncHandler:
logger.debug("no-oping sync")
return set(), set(), set(), set()
- ignored_account_data = (
- await self.store.get_global_account_data_by_type_for_user(
- AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
- )
- )
-
- # If there is ignored users account data and it matches the proper type,
- # then use it.
- ignored_users: FrozenSet[str] = frozenset()
- if ignored_account_data:
- ignored_users_data = ignored_account_data.get("ignored_users", {})
- if isinstance(ignored_users_data, dict):
- ignored_users = frozenset(ignored_users_data.keys())
-
+ # 3. Work out which rooms need reporting in the sync response.
+ ignored_users = await self._get_ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
@@ -1543,7 +1536,6 @@ class SyncHandler:
)
else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
-
tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1554,6 +1546,8 @@ class SyncHandler:
newly_joined_rooms = room_changes.newly_joined_rooms
newly_left_rooms = room_changes.newly_left_rooms
+ # 4. We need to apply further processing to `room_entries` (rooms considered
+ # joined or archived).
async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
logger.debug("Generating room entry for %s", room_entry.room_id)
await self._generate_room_entry(
@@ -1572,31 +1566,13 @@ class SyncHandler:
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
- # Now we want to get any newly joined, invited or knocking users
- newly_joined_or_invited_or_knocked_users = set()
- newly_left_users = set()
- if since_token:
- for joined_sync in sync_result_builder.joined:
- it = itertools.chain(
- joined_sync.timeline.events, joined_sync.state.values()
- )
- for event in it:
- if event.type == EventTypes.Member:
- if (
- event.membership == Membership.JOIN
- or event.membership == Membership.INVITE
- or event.membership == Membership.KNOCK
- ):
- newly_joined_or_invited_or_knocked_users.add(
- event.state_key
- )
- else:
- prev_content = event.unsigned.get("prev_content", {})
- prev_membership = prev_content.get("membership", None)
- if prev_membership == Membership.JOIN:
- newly_left_users.add(event.state_key)
-
- newly_left_users -= newly_joined_or_invited_or_knocked_users
+ # 5. Work out which users have joined or left rooms we're in. We use this
+ # to build the device_list part of the sync response in
+ # `_generate_sync_entry_for_device_list`.
+ (
+ newly_joined_or_invited_or_knocked_users,
+ newly_left_users,
+ ) = sync_result_builder.calculate_user_changes()
return (
set(newly_joined_rooms),
@@ -1605,6 +1581,29 @@ class SyncHandler:
newly_left_users,
)
+ async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]:
+ """Retrieve the users ignored by the given user from their global account_data.
+
+ Returns an empty set if
+ - there is no global account_data entry for ignored_users
+ - there is such an entry, but it's not a JSON object.
+ """
+ # TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
+ ignored_account_data = (
+ await self.store.get_global_account_data_by_type_for_user(
+ AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
+ )
+ )
+
+ # If there is ignored users account data and it matches the proper type,
+ # then use it.
+ ignored_users: FrozenSet[str] = frozenset()
+ if ignored_account_data:
+ ignored_users_data = ignored_account_data.get("ignored_users", {})
+ if isinstance(ignored_users_data, dict):
+ ignored_users = frozenset(ignored_users_data.keys())
+ return ignored_users
+
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
) -> bool:
@@ -1747,6 +1746,7 @@ class SyncHandler:
if not non_joins:
continue
+ last_non_join = non_joins[-1]
# Check if we have left the room. This can either be because we were
# joined before *or* that we since joined and then left.
@@ -1768,18 +1768,18 @@ class SyncHandler:
newly_left_rooms.append(room_id)
# Only bother if we're still currently invited
- should_invite = non_joins[-1].membership == Membership.INVITE
+ should_invite = last_non_join.membership == Membership.INVITE
if should_invite:
- if event.sender not in ignored_users:
- invite_room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if last_non_join.sender not in ignored_users:
+ invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
if invite_room_sync:
invited.append(invite_room_sync)
# Only bother if our latest membership in the room is knock (and we haven't
# been accepted/rejected in the meantime).
- should_knock = non_joins[-1].membership == Membership.KNOCK
+ should_knock = last_non_join.membership == Membership.KNOCK
if should_knock:
- knock_room_sync = KnockedSyncResult(room_id, knock=non_joins[-1])
+ knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join)
if knock_room_sync:
knocked.append(knock_room_sync)
@@ -2316,6 +2316,39 @@ class SyncResultBuilder:
groups: Optional[GroupsSyncResult] = None
to_device: List[JsonDict] = attr.Factory(list)
+ def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+ """Work out which other users have joined or left rooms we are joined to.
+
+ This data only is only useful for an incremental sync.
+
+ The SyncResultBuilder is not modified by this function.
+ """
+ newly_joined_or_invited_or_knocked_users = set()
+ newly_left_users = set()
+ if self.since_token:
+ for joined_sync in self.joined:
+ it = itertools.chain(
+ joined_sync.timeline.events, joined_sync.state.values()
+ )
+ for event in it:
+ if event.type == EventTypes.Member:
+ if (
+ event.membership == Membership.JOIN
+ or event.membership == Membership.INVITE
+ or event.membership == Membership.KNOCK
+ ):
+ newly_joined_or_invited_or_knocked_users.add(
+ event.state_key
+ )
+ else:
+ prev_content = event.unsigned.get("prev_content", {})
+ prev_membership = prev_content.get("membership", None)
+ if prev_membership == Membership.JOIN:
+ newly_left_users.add(event.state_key)
+
+ newly_left_users -= newly_joined_or_invited_or_knocked_users
+ return newly_joined_or_invited_or_knocked_users, newly_left_users
+
@attr.s(slots=True, auto_attribs=True)
class RoomSyncResultBuilder:
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 6bfb4b8d1b..662e60bc33 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -108,6 +108,7 @@ from synapse.types import (
create_requester,
)
from synapse.util import Clock
+from synapse.util.async_helpers import maybe_awaitable
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
@@ -1014,9 +1015,7 @@ class ModuleApi:
run_as_background_process,
msec,
desc,
- f,
- *args,
- **kwargs,
+ lambda: maybe_awaitable(f(*args, **kwargs)),
)
else:
logger.warning(
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 659a53805d..7f68092ec5 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -17,10 +17,9 @@ import logging
import re
from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
-from matrix_common.regex import glob_to_regex, to_word_pattern
-
from synapse.events import EventBase
from synapse.types import JsonDict, UserID
+from synapse.util import glob_to_regex, re_word_boundary
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -185,7 +184,7 @@ class PushRuleEvaluatorForEvent:
r = regex_cache.get((display_name, False, True), None)
if not r:
r1 = re.escape(display_name)
- r1 = to_word_pattern(r1)
+ r1 = re_word_boundary(r1)
r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
@@ -214,7 +213,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
try:
r = regex_cache.get((glob, True, word_boundary), None)
if not r:
- r = glob_to_regex(glob, word_boundary=word_boundary)
+ r = glob_to_regex(glob, word_boundary)
regex_cache[(glob, True, word_boundary)] = r
return bool(r.search(value))
except re.error:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 386debd7db..7d26954244 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -87,7 +87,6 @@ REQUIREMENTS = [
# with the latest security patches.
"cryptography>=3.4.7",
"ijson>=3.1",
- "matrix-common==1.0.0",
]
CONDITIONAL_REQUIREMENTS = {
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 669ab44a45..829e86675a 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -106,9 +106,6 @@ class RoomRestV2Servlet(RestServlet):
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
)
- if not await self._store.get_room(room_id):
- raise NotFoundError("Unknown room id %s" % (room_id,))
-
delete_id = self._pagination_handler.start_shutdown_and_purge_room(
room_id=room_id,
new_room_user_id=content.get("new_room_user_id"),
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index f62bc03915..db22501c09 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -112,6 +112,8 @@ class DeviceWorkerStore(SQLBaseStore):
A dict containing the device information
Raises:
StoreError: if the device is not found
+ See also:
+ `get_device_opt` which returns None instead if the device is not found
"""
return await self.db_pool.simple_select_one(
table="devices",
@@ -120,6 +122,26 @@ class DeviceWorkerStore(SQLBaseStore):
desc="get_device",
)
+ async def get_device_opt(
+ self, user_id: str, device_id: str
+ ) -> Optional[Dict[str, Any]]:
+ """Retrieve a device. Only returns devices that are not marked as
+ hidden.
+
+ Args:
+ user_id: The ID of the user which owns the device
+ device_id: The ID of the device to retrieve
+ Returns:
+ A dict containing the device information, or None if the device does not exist.
+ """
+ return await self.db_pool.simple_select_one(
+ table="devices",
+ keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
+ retcols=("user_id", "device_id", "display_name"),
+ desc="get_device",
+ allow_none=True,
+ )
+
async def get_devices_by_user(self, user_id: str) -> Dict[str, Dict[str, str]]:
"""Retrieve all of a user's registered devices. Only returns devices
that are not marked as hidden.
@@ -274,7 +296,9 @@ class DeviceWorkerStore(SQLBaseStore):
# add the updated cross-signing keys to the results list
for user_id, result in cross_signing_keys_by_user.items():
result["user_id"] = user_id
- # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+ results.append(("m.signing_key_update", result))
+ # also send the unstable version
+ # FIXME: remove this when enough servers have upgraded
results.append(("org.matrix.signing_key_update", result))
return now_stream_id, results
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 7f3624b128..188afec332 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -56,7 +56,9 @@ class StateDeltasStore(SQLBaseStore):
prev_stream_id = int(prev_stream_id)
# check we're not going backwards
- assert prev_stream_id <= max_stream_id
+ assert (
+ prev_stream_id <= max_stream_id
+ ), f"New stream id {max_stream_id} is smaller than prev stream id {prev_stream_id}"
if not self._curr_state_delta_stream_cache.has_any_entity_changed(
prev_stream_id
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 4ff3013908..b8112e1c05 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -74,8 +74,6 @@ class IdGenerator:
def _load_current_id(
db_conn: LoggingDatabaseConnection, table: str, column: str, step: int = 1
) -> int:
- # debug logging for https://github.com/matrix-org/synapse/issues/7968
- logger.info("initialising stream generator for %s(%s)", table, column)
cur = db_conn.cursor(txn_name="_load_current_id")
if step == 1:
cur.execute("SELECT MAX(%s) FROM %s" % (column, table))
@@ -86,7 +84,9 @@ def _load_current_id(
(val,) = result
cur.close()
current_id = int(val) if val else step
- return (max if step > 0 else min)(current_id, step)
+ res = (max if step > 0 else min)(current_id, step)
+ logger.info("Initialising stream generator for %s(%s): %i", table, column, res)
+ return res
class AbstractStreamIdTracker(metaclass=abc.ABCMeta):
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index f157132210..95f23e27b6 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -14,8 +14,9 @@
import json
import logging
+import re
import typing
-from typing import Any, Callable, Dict, Generator, Optional
+from typing import Any, Callable, Dict, Generator, Optional, Pattern
import attr
from frozendict import frozendict
@@ -34,6 +35,9 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
+_WILDCARD_RUN = re.compile(r"([\?\*]+)")
+
+
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
@@ -181,3 +185,56 @@ def log_failure(
if not consumeErrors:
return failure
return None
+
+
+def glob_to_regex(glob: str, word_boundary: bool = False) -> Pattern:
+ """Converts a glob to a compiled regex object.
+
+ Args:
+ glob: pattern to match
+ word_boundary: If True, the pattern will be allowed to match at word boundaries
+ anywhere in the string. Otherwise, the pattern is anchored at the start and
+ end of the string.
+
+ Returns:
+ compiled regex pattern
+ """
+
+ # Patterns with wildcards must be simplified to avoid performance cliffs
+ # - The glob `?**?**?` is equivalent to the glob `???*`
+ # - The glob `???*` is equivalent to the regex `.{3,}`
+ chunks = []
+ for chunk in _WILDCARD_RUN.split(glob):
+ # No wildcards? re.escape()
+ if not _WILDCARD_RUN.match(chunk):
+ chunks.append(re.escape(chunk))
+ continue
+
+ # Wildcards? Simplify.
+ qmarks = chunk.count("?")
+ if "*" in chunk:
+ chunks.append(".{%d,}" % qmarks)
+ else:
+ chunks.append(".{%d}" % qmarks)
+
+ res = "".join(chunks)
+
+ if word_boundary:
+ res = re_word_boundary(res)
+ else:
+ # \A anchors at start of string, \Z at end of string
+ res = r"\A" + res + r"\Z"
+
+ return re.compile(res, re.IGNORECASE)
+
+
+def re_word_boundary(r: str) -> str:
+ """
+ Adds word boundary characters to the start and end of an
+ expression to require that the match occur as a whole word,
+ but do so respecting the fact that strings starting or ending
+ with non-word characters will change word boundaries.
+ """
+ # we can't use \b as it chokes on unicode. however \W seems to be okay
+ # as shorthand for [^0-9A-Za-z_].
+ return r"(^|\W)%s(\W|$)" % (r,)
|