diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 2056ecb2c3..a99aea8926 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -544,6 +544,48 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index.
"""
+ async def updater(progress: JsonDict, batch_size: int) -> int:
+ await self.create_index_in_background(
+ index_name=index_name,
+ table=table,
+ columns=columns,
+ where_clause=where_clause,
+ unique=unique,
+ psql_only=psql_only,
+ replaces_index=replaces_index,
+ )
+ await self._end_background_update(update_name)
+ return 1
+
+ self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
+ updater, oneshot=True
+ )
+
+ async def create_index_in_background(
+ self,
+ index_name: str,
+ table: str,
+ columns: Iterable[str],
+ where_clause: Optional[str] = None,
+ unique: bool = False,
+ psql_only: bool = False,
+ replaces_index: Optional[str] = None,
+ ) -> None:
+ """Add an index in the background.
+
+ Args:
+ update_name: update_name to register for
+ index_name: name of index to add
+ table: table to add index to
+ columns: columns/expressions to include in index
+ where_clause: A WHERE clause to specify a partial unique index.
+ unique: true to make a UNIQUE index
+ psql_only: true to only create this index on psql databases (useful
+ for virtual sqlite tables)
+ replaces_index: The name of an index that this index replaces.
+ The named index will be dropped upon completion of the new index.
+ """
+
def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
@@ -618,16 +660,11 @@ class BackgroundUpdater:
else:
runner = create_index_sqlite
- async def updater(progress: JsonDict, batch_size: int) -> int:
- if runner is not None:
- logger.info("Adding index %s to %s", index_name, table)
- await self.db_pool.runWithConnection(runner)
- await self._end_background_update(update_name)
- return 1
+ if runner is None:
+ return
- self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
- updater, oneshot=True
- )
+ logger.info("Adding index %s to %s", index_name, table)
+ await self.db_pool.runWithConnection(runner)
async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 95d4c0622d..a5bb4d404e 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1569,77 +1569,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
return rows
- async def check_too_many_devices_for_user(
- self, user_id: str, limit: int
- ) -> List[str]:
- """Check if the user has a lot of devices, and if so return the set of
- devices we can prune.
-
- This does *not* return hidden devices or devices with E2E keys.
-
- Returns at most `limit` number of devices, ordered by last seen.
- """
-
- num_devices = await self.db_pool.simple_select_one_onecol(
- table="devices",
- keyvalues={"user_id": user_id, "hidden": False},
- retcol="COALESCE(COUNT(*), 0)",
- desc="count_devices",
- )
-
- # We let users have up to ten devices without pruning.
- if num_devices <= 10:
- return []
-
- # We prune everything older than N days.
- max_last_seen = self._clock.time_msec() - 14 * 24 * 60 * 60 * 1000
-
- if num_devices > 50:
- # If the user has more than 50 devices, then we chose a last seen
- # that ensures we keep at most 50 devices.
- sql = """
- SELECT last_seen FROM devices
- LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
- WHERE
- user_id = ?
- AND NOT hidden
- AND last_seen IS NOT NULL
- AND key_json IS NULL
- ORDER BY last_seen DESC
- LIMIT 1
- OFFSET 50
- """
-
- rows = await self.db_pool.execute(
- "check_too_many_devices_for_user_last_seen", None, sql, (user_id,)
- )
- if rows:
- max_last_seen = max(rows[0][0], max_last_seen)
-
- # Now fetch the devices to delete.
- sql = """
- SELECT device_id FROM devices
- LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
- WHERE
- user_id = ?
- AND NOT hidden
- AND last_seen < ?
- AND key_json IS NULL
- ORDER BY last_seen
- LIMIT ?
- """
-
- def check_too_many_devices_for_user_txn(
- txn: LoggingTransaction,
- ) -> List[str]:
- txn.execute(sql, (user_id, max_last_seen, limit))
- return [device_id for device_id, in txn]
-
- return await self.db_pool.runInteraction(
- "check_too_many_devices_for_user",
- check_too_many_devices_for_user_txn,
- )
-
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Because we have write access, this will be a StreamIdGenerator
@@ -1698,7 +1627,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values={},
insertion_values={
"display_name": initial_device_display_name,
- "last_seen": self._clock.time_msec(),
"hidden": False,
},
desc="store_device",
@@ -1744,15 +1672,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
raise StoreError(500, "Problem storing device.")
- @cached(max_entries=0)
- async def delete_device(self, user_id: str, device_id: str) -> None:
- raise NotImplementedError()
-
- # Note: sometimes deleting rows out of `device_inbox` can take a long time,
- # so we use a cache so that we deduplicate in flight requests to delete
- # devices.
- @cachedList(cached_method_name="delete_device", list_name="device_ids")
- async def delete_devices(self, user_id: str, device_ids: Collection[str]) -> dict:
+ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
"""Deletes several devices.
Args:
@@ -1789,8 +1709,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))
- return {}
-
async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 643c47d608..4c691642e2 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -140,7 +140,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
@cancellable
async def get_e2e_device_keys_for_cs_api(
self,
- query_list: List[Tuple[str, Optional[str]]],
+ query_list: Collection[Tuple[str, Optional[str]]],
include_displaynames: bool = True,
) -> Dict[str, Dict[str, JsonDict]]:
"""Fetch a list of device keys, formatted suitably for the C/S API.
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index a580e4bdda..e06725f69c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -924,39 +924,6 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
return batch_size
- async def _create_receipts_index(self, index_name: str, table: str) -> None:
- """Adds a unique index on `(room_id, receipt_type, user_id)` to the given
- receipts table, for non-thread receipts."""
-
- def _create_index(conn: LoggingDatabaseConnection) -> None:
- conn.rollback()
-
- # we have to set autocommit, because postgres refuses to
- # CREATE INDEX CONCURRENTLY without it.
- if isinstance(self.database_engine, PostgresEngine):
- conn.set_session(autocommit=True)
-
- try:
- c = conn.cursor()
-
- # Now that the duplicates are gone, we can create the index.
- concurrently = (
- "CONCURRENTLY"
- if isinstance(self.database_engine, PostgresEngine)
- else ""
- )
- sql = f"""
- CREATE UNIQUE INDEX {concurrently} {index_name}
- ON {table}(room_id, receipt_type, user_id)
- WHERE thread_id IS NULL
- """
- c.execute(sql)
- finally:
- if isinstance(self.database_engine, PostgresEngine):
- conn.set_session(autocommit=False)
-
- await self.db_pool.runWithConnection(_create_index)
-
async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
@@ -999,9 +966,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
- await self._create_receipts_index(
- "receipts_linearized_unique_index",
- "receipts_linearized",
+ await self.db_pool.updates.create_index_in_background(
+ index_name="receipts_linearized_unique_index",
+ table="receipts_linearized",
+ columns=["room_id", "receipt_type", "user_id"],
+ where_clause="thread_id IS NULL",
+ unique=True,
)
await self.db_pool.updates._end_background_update(
@@ -1050,9 +1020,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
- await self._create_receipts_index(
- "receipts_graph_unique_index",
- "receipts_graph",
+ await self.db_pool.updates.create_index_in_background(
+ index_name="receipts_graph_unique_index",
+ table="receipts_graph",
+ columns=["room_id", "receipt_type", "user_id"],
+ where_clause="thread_id IS NULL",
+ unique=True,
)
await self.db_pool.updates._end_background_update(
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index af9952f513..14ef5b040d 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -26,6 +26,14 @@ from typing import (
cast,
)
+try:
+ # Figure out if ICU support is available for searching users.
+ import icu
+
+ USE_ICU = True
+except ModuleNotFoundError:
+ USE_ICU = False
+
from typing_extensions import TypedDict
from synapse.api.errors import StoreError
@@ -900,7 +908,7 @@ def _parse_query_sqlite(search_term: str) -> str:
"""
# Pull out the individual words, discarding any non-word characters.
- results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+ results = _parse_words(search_term)
return " & ".join("(%s* OR %s)" % (result, result) for result in results)
@@ -910,12 +918,63 @@ def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
We use this so that we can add prefix matching, which isn't something
that is supported by default.
"""
-
- # Pull out the individual words, discarding any non-word characters.
- results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+ results = _parse_words(search_term)
both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
exact = " & ".join("%s" % (result,) for result in results)
prefix = " & ".join("%s:*" % (result,) for result in results)
return both, exact, prefix
+
+
+def _parse_words(search_term: str) -> List[str]:
+ """Split the provided search string into a list of its words.
+
+ If support for ICU (International Components for Unicode) is available, use it.
+ Otherwise, fall back to using a regex to detect word boundaries. This latter
+ solution works well enough for most latin-based languages, but doesn't work as well
+ with other languages.
+
+ Args:
+ search_term: The search string.
+
+ Returns:
+ A list of the words in the search string.
+ """
+ if USE_ICU:
+ return _parse_words_with_icu(search_term)
+
+ return re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+
+
+def _parse_words_with_icu(search_term: str) -> List[str]:
+ """Break down the provided search string into its individual words using ICU
+ (International Components for Unicode).
+
+ Args:
+ search_term: The search string.
+
+ Returns:
+ A list of the words in the search string.
+ """
+ results = []
+ breaker = icu.BreakIterator.createWordInstance(icu.Locale.getDefault())
+ breaker.setText(search_term)
+ i = 0
+ while True:
+ j = breaker.nextBoundary()
+ if j < 0:
+ break
+
+ result = search_term[i:j]
+
+ # libicu considers spaces and punctuation between words as words, but we don't
+ # want to include those in results as they would result in syntax errors in SQL
+ # queries (e.g. "foo bar" would result in the search query including "foo & &
+ # bar").
+ if len(re.findall(r"([\w\-]+)", result, re.UNICODE)):
+ results.append(result)
+
+ i = j
+
+ return results
|