diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d0cf3460da..70ca3e09f7 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -324,7 +324,7 @@ class AccountDataWorkerStore(SQLBaseStore):
user_id, int(stream_id)
)
if not changed:
- return ({}, {})
+ return {}, {}
return await self.db_pool.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index e2d1b758bd..2da2659f41 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -60,7 +60,7 @@ def _make_exclusive_regex(
class ApplicationServiceWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
self.services_cache = load_appservices(
- hs.hostname, hs.config.app_service_config_files
+ hs.hostname, hs.config.appservice.app_service_config_files
)
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 7a98275d92..7e33ae578c 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -555,8 +555,11 @@ class ClientIpStore(ClientIpWorkerStore):
return ret
async def get_user_ip_and_agents(
- self, user: UserID
+ self, user: UserID, since_ts: int = 0
) -> List[Dict[str, Union[str, int]]]:
+ """
+ Fetch IP/User Agent connection since a given timestamp.
+ """
user_id = user.to_string()
results = {}
@@ -568,13 +571,23 @@ class ClientIpStore(ClientIpWorkerStore):
) = key
if uid == user_id:
user_agent, _, last_seen = self._batch_row_update[key]
- results[(access_token, ip)] = (user_agent, last_seen)
+ if last_seen >= since_ts:
+ results[(access_token, ip)] = (user_agent, last_seen)
- rows = await self.db_pool.simple_select_list(
- table="user_ips",
- keyvalues={"user_id": user_id},
- retcols=["access_token", "ip", "user_agent", "last_seen"],
- desc="get_user_ip_and_agents",
+ def get_recent(txn):
+ txn.execute(
+ """
+ SELECT access_token, ip, user_agent, last_seen FROM user_ips
+ WHERE last_seen >= ? AND user_id = ?
+ ORDER BY last_seen
+ DESC
+ """,
+ (since_ts, user_id),
+ )
+ return txn.fetchall()
+
+ rows = await self.db_pool.runInteraction(
+ desc="get_user_ip_and_agents", func=get_recent
)
results.update(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index c55508867d..3154906d45 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -136,7 +136,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id, last_stream_id
)
if not has_changed:
- return ([], current_stream_id)
+ return [], current_stream_id
def get_new_messages_for_device_txn(txn):
sql = (
@@ -240,11 +240,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
if not has_changed or last_stream_id == current_stream_id:
log_kv({"message": "No new messages in stream"})
- return ([], current_stream_id)
+ return [], current_stream_id
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
- return ([], last_stream_id)
+ return [], last_stream_id
@trace
def get_new_messages_for_remote_destination_txn(txn):
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 1f0a39eac4..a95ac34f09 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -824,6 +824,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
if otk_row is None:
return None
+ self._invalidate_cache_and_stream(
+ txn, self.count_e2e_one_time_keys, (user_id, device_id)
+ )
+
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 047782eb06..10184d6ae7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
LIMIT ?
"""
- # Find any chunk connections of a given insertion event
- chunk_connection_query = """
+ # Find any batch connections of a given insertion event
+ batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
- /* Find the chunk that connects to the given insertion event */
- INNER JOIN chunk_events AS c
- ON i.next_chunk_id = c.chunk_id
- /* Get the depth of the chunk start event from the events table */
+ /* Find the batch that connects to the given insertion event */
+ INNER JOIN batch_events AS c
+ ON i.next_batch_id = c.batch_id
+ /* Get the depth of the batch start event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
@@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
- # Try and find any potential historical chunks of message history.
+ # Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
- # find any chunk events connected to the insertion event (by
- # chunk_id). If we find any, we'll add them to the queue and
+ # find any batch events connected to the insertion event (by
+ # batch_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
@@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
- # Find any chunk connections for the given insertion event
+ # Find any batch connections for the given insertion event
txn.execute(
- chunk_connection_query,
+ batch_connection_query,
(connected_insertion_event, limit - len(event_results)),
)
- chunk_start_event_id_results = txn.fetchall()
+ batch_start_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events: chunk_start_event_id_results %s",
- chunk_start_event_id_results,
+ "_get_backfill_events: batch_start_event_id_results %s",
+ batch_start_event_id_results,
)
- for row in chunk_start_event_id_results:
+ for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index dec7e8594e..cc4e31ec30 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1276,13 +1276,6 @@ class PersistEventsStore:
logger.exception("")
raise
- # update the stored internal_metadata to update the "outlier" flag.
- # TODO: This is unused as of Synapse 1.31. Remove it once we are happy
- # to drop backwards-compatibility with 1.30.
- metadata_json = json_encoder.encode(event.internal_metadata.get_dict())
- sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
- txn.execute(sql, (metadata_json, event.event_id))
-
# Add an entry to the ex_outlier_stream table to replicate the
# change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
@@ -1327,19 +1320,6 @@ class PersistEventsStore:
d.pop("redacted_because", None)
return d
- def get_internal_metadata(event):
- im = event.internal_metadata.get_dict()
-
- # temporary hack for database compatibility with Synapse 1.30 and earlier:
- # store the `outlier` flag inside the internal_metadata json as well as in
- # the `events` table, so that if anyone rolls back to an older Synapse,
- # things keep working. This can be removed once we are happy to drop support
- # for that
- if event.internal_metadata.is_outlier():
- im["outlier"] = True
-
- return im
-
self.db_pool.simple_insert_many_txn(
txn,
table="event_json",
@@ -1348,7 +1328,7 @@ class PersistEventsStore:
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": json_encoder.encode(
- get_internal_metadata(event)
+ event.internal_metadata.get_dict()
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
@@ -1509,7 +1489,7 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_insertion_event(txn, event)
- self._handle_chunk_event(txn, event)
+ self._handle_batch_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
@@ -1790,23 +1770,23 @@ class PersistEventsStore:
):
return
- next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
- if next_chunk_id is None:
- # Invalid insertion event without next chunk ID
+ next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
+ if next_batch_id is None:
+ # Invalid insertion event without next batch ID
return
logger.debug(
- "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "next_chunk_id": next_chunk_id,
+ "next_batch_id": next_batch_id,
},
)
@@ -1822,8 +1802,8 @@ class PersistEventsStore:
},
)
- def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
- """Handles inserting the chunk edges/connections between the chunk event
+ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the batch edges/connections between the batch event
and an insertion event. Part of MSC2716.
Args:
@@ -1831,11 +1811,11 @@ class PersistEventsStore:
event: The event to process
"""
- if event.type != EventTypes.MSC2716_CHUNK:
- # Not a chunk event
+ if event.type != EventTypes.MSC2716_BATCH:
+ # Not a batch event
return
- # Skip processing a chunk event if the room version doesn't
+ # Skip processing a batch event if the room version doesn't
# support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
room_creator = self.db_pool.simple_select_one_onecol_txn(
@@ -1852,35 +1832,35 @@ class PersistEventsStore:
):
return
- chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
- if chunk_id is None:
- # Invalid chunk event without a chunk ID
+ batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
+ if batch_id is None:
+ # Invalid batch event without a batch ID
return
- logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+ logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
- table="chunk_events",
+ table="batch_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "chunk_id": chunk_id,
+ "batch_id": batch_id,
},
)
- # When we receive an event with a `chunk_id` referencing the
- # `next_chunk_id` of the insertion event, we can remove it from the
+ # When we receive an event with a `batch_id` referencing the
+ # `next_batch_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
- WHERE next_chunk_id = ?
+ WHERE next_batch_id = ?
)
"""
- txn.execute(sql, (chunk_id,))
+ txn.execute(sql, (batch_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d72e716b5c..4a1a2f4a6a 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1495,7 +1495,7 @@ class EventsWorkerStore(SQLBaseStore):
if not res:
raise SynapseError(404, "Could not find event %s" % (event_id,))
- return (int(res["topological_ordering"]), int(res["stream_ordering"]))
+ return int(res["topological_ordering"]), int(res["stream_ordering"])
async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]:
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index d213b26703..b76ee51a9b 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -63,7 +63,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
"""Generates current count of monthly active users broken down by service.
A service is typically an appservice but also includes native matrix users.
Since the `monthly_active_users` table is populated from the `user_ips` table
- `config.track_appservice_user_ips` must be set to `true` for this
+ `config.appservice.track_appservice_user_ips` must be set to `true` for this
method to return anything other than native matrix users.
Returns:
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index bccff5e5b9..3eb30944bf 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -102,15 +102,19 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
(room_id,),
)
rows = txn.fetchall()
- max_depth = max(row[1] for row in rows)
-
- if max_depth < token.topological:
- # We need to ensure we don't delete all the events from the database
- # otherwise we wouldn't be able to send any events (due to not
- # having any backwards extremities)
- raise SynapseError(
- 400, "topological_ordering is greater than forward extremeties"
- )
+ # if we already have no forwards extremities (for example because they were
+ # cleared out by the `delete_old_current_state_events` background database
+ # update), then we may as well carry on.
+ if rows:
+ max_depth = max(row[1] for row in rows)
+
+ if max_depth < token.topological:
+ # We need to ensure we don't delete all the events from the database
+ # otherwise we wouldn't be able to send any events (due to not
+ # having any backwards extremities)
+ raise SynapseError(
+ 400, "topological_ordering is greater than forward extremities"
+ )
logger.info("[purge] looking for events to delete")
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index edeaacd7a6..01a4281301 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Optional, Tuple
from twisted.internet import defer
@@ -153,12 +153,12 @@ class ReceiptsWorkerStore(SQLBaseStore):
}
async def get_linearized_receipts_for_rooms(
- self, room_ids: List[str], to_key: int, from_key: Optional[int] = None
+ self, room_ids: Iterable[str], to_key: int, from_key: Optional[int] = None
) -> List[dict]:
"""Get receipts for multiple rooms for sending to clients.
Args:
- room_id: List of room_ids.
+ room_id: The room IDs to fetch receipts of.
to_key: Max stream id to fetch receipts up to.
from_key: Min stream id to fetch receipts from. None fetches
from the start.
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index fafadb88fc..c83089ee63 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -388,7 +388,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"get_users_expiring_soon",
select_users_txn,
self._clock.time_msec(),
- self.config.account_validity_renew_at,
+ self.config.account_validity.account_validity_renew_at,
)
async def set_renewal_mail_status(self, user_id: str, email_sent: bool) -> None:
@@ -2015,7 +2015,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
(user_id_obj.localpart, create_profile_with_displayname),
)
- if self.hs.config.stats_enabled:
+ if self.hs.config.stats.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 54fa361d3e..a383388757 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore
class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]:
+ async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
"""Retrieve a insertion event ID.
Args:
- chunk_id: The chunk ID of the insertion event to retrieve.
+ batch_id: The batch ID of the insertion event to retrieve.
Returns:
The event_id of an insertion event, or None if there is no known
@@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore):
"""
return await self.db_pool.simple_select_one_onecol(
table="insertion_events",
- keyvalues={"next_chunk_id": chunk_id},
+ keyvalues={"next_batch_id": batch_id},
retcol="event_id",
allow_none=True,
)
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 9beeb96aa9..ddb162a4fc 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
if (
self.hs.config.worker.run_background_tasks
- and self.hs.config.metrics_flags.known_servers
+ and self.hs.config.metrics.metrics_flags.known_servers
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
@@ -162,7 +162,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn,
)
- @cached(max_entries=100000, iterable=True)
+ @cached(max_entries=100000, iterable=True, prune_unread_entries=False)
async def get_users_in_room(self, room_id: str) -> List[str]:
return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
@@ -439,7 +439,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return results_dict.get("membership"), results_dict.get("event_id")
- @cached(max_entries=500000, iterable=True)
+ @cached(max_entries=500000, iterable=True, prune_unread_entries=False)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
@@ -544,7 +544,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
return frozenset(r.room_id for r in rooms)
- @cached(max_entries=500000, cache_context=True, iterable=True)
+ @cached(
+ max_entries=500000,
+ cache_context=True,
+ iterable=True,
+ prune_unread_entries=False,
+ )
async def get_users_who_share_room_with_user(
self, user_id: str, cache_context: _CacheContext
) -> Set[str]:
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 6480d5a9f5..2a1e99e17a 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -15,12 +15,12 @@
import logging
import re
from collections import namedtuple
-from typing import Collection, List, Optional, Set
+from typing import Collection, Iterable, List, Optional, Set
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -32,14 +32,24 @@ SearchEntry = namedtuple(
)
+def _clean_value_for_search(value: str) -> str:
+ """
+ Replaces any null code points in the string with spaces as
+ Postgres and SQLite do not like the insertion of strings with
+ null code points into the full-text search tables.
+ """
+ return value.replace("\u0000", " ")
+
+
class SearchWorkerStore(SQLBaseStore):
- def store_search_entries_txn(self, txn, entries):
+ def store_search_entries_txn(
+ self, txn: LoggingTransaction, entries: Iterable[SearchEntry]
+ ) -> None:
"""Add entries to the search table
Args:
- txn (cursor):
- entries (iterable[SearchEntry]):
- entries to be added to the table
+ txn:
+ entries: entries to be added to the table
"""
if not self.hs.config.enable_search:
return
@@ -55,7 +65,7 @@ class SearchWorkerStore(SQLBaseStore):
entry.event_id,
entry.room_id,
entry.key,
- entry.value,
+ _clean_value_for_search(entry.value),
entry.stream_ordering,
entry.origin_server_ts,
)
@@ -70,11 +80,16 @@ class SearchWorkerStore(SQLBaseStore):
" VALUES (?,?,?,?)"
)
args = (
- (entry.event_id, entry.room_id, entry.key, entry.value)
+ (
+ entry.event_id,
+ entry.room_id,
+ entry.key,
+ _clean_value_for_search(entry.value),
+ )
for entry in entries
)
-
txn.execute_batch(sql, args)
+
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
@@ -646,6 +661,7 @@ class SearchStore(SearchBackgroundUpdateStore):
for key in ("body", "name", "topic"):
v = event.content.get(key, None)
if v:
+ v = _clean_value_for_search(v)
values.append(v)
if not values:
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index bff7d0404f..a89747d741 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -58,7 +58,7 @@ class StateDeltasStore(SQLBaseStore):
# if the CSDs haven't changed between prev_stream_id and now, we
# know for certain that they haven't changed between prev_stream_id and
# max_stream_id.
- return (max_stream_id, [])
+ return max_stream_id, []
def get_current_state_deltas_txn(txn):
# First we calculate the max stream id that will give us less than
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 343d6efc92..e20033bb28 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -98,7 +98,7 @@ class StatsStore(StateDeltasStore):
self.server_name = hs.hostname
self.clock = self.hs.get_clock()
- self.stats_enabled = hs.config.stats_enabled
+ self.stats_enabled = hs.config.stats.stats_enabled
self.stats_delta_processing_lock = DeferredLock()
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 959f13de47..dc7884b1c0 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -39,6 +39,8 @@ import logging
from collections import namedtuple
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
+from frozendict import frozendict
+
from twisted.internet import defer
from synapse.api.filtering import Filter
@@ -379,7 +381,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
if p > min_pos
}
- return RoomStreamToken(None, min_pos, positions)
+ return RoomStreamToken(None, min_pos, frozendict(positions))
async def get_room_events_stream_for_rooms(
self,
@@ -622,7 +624,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
self._set_before_and_after(events, rows)
- return (events, token)
+ return events, token
async def get_recent_event_ids_for_room(
self, room_id: str, limit: int, end_token: RoomStreamToken
@@ -1240,7 +1242,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
self._set_before_and_after(events, rows)
- return (events, token)
+ return events, token
@cached()
async def get_id_for_instance(self, instance_name: str) -> int:
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 8aebdc2817..90d65edc42 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -14,14 +14,28 @@
import logging
import re
-from typing import Any, Dict, Iterable, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ cast,
+)
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.state import StateFilter
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.storage.types import Connection
+from synapse.types import JsonDict, get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -36,7 +50,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# add_users_who_share_private_rooms?
SHARE_PRIVATE_WORKING_SET = 500
- def __init__(self, database: DatabasePool, db_conn, hs):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: Connection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.server_name = hs.hostname
@@ -57,10 +76,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)
- async def _populate_user_directory_createtables(self, progress, batch_size):
+ async def _populate_user_directory_createtables(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
# Get all the rooms that we want to process.
- def _make_staging_area(txn):
+ def _make_staging_area(txn: LoggingTransaction) -> None:
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
@@ -85,19 +106,17 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms
- # If search all users is on, get all the users we want to add.
- if self.hs.config.user_directory_search_all_users:
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_users(user_id TEXT NOT NULL)"
- )
- txn.execute(sql)
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_users(user_id TEXT NOT NULL)"
+ )
+ txn.execute(sql)
- txn.execute("SELECT name FROM users")
- users = [{"user_id": x[0]} for x in txn.fetchall()]
+ txn.execute("SELECT name FROM users")
+ users = [{"user_id": x[0]} for x in txn.fetchall()]
- self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+ self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
new_pos = await self.get_max_stream_id_in_current_state_deltas()
await self.db_pool.runInteraction(
@@ -112,16 +131,20 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
return 1
- async def _populate_user_directory_cleanup(self, progress, batch_size):
+ async def _populate_user_directory_cleanup(
+ self,
+ progress: JsonDict,
+ batch_size: int,
+ ) -> int:
"""
Update the user directory stream position, then clean up the old tables.
"""
position = await self.db_pool.simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
+ TEMP_TABLE + "_position", {}, "position"
)
await self.update_user_directory_stream_pos(position)
- def _delete_staging_area(txn):
+ def _delete_staging_area(txn: LoggingTransaction) -> None:
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
@@ -135,18 +158,32 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
return 1
- async def _populate_user_directory_process_rooms(self, progress, batch_size):
+ async def _populate_user_directory_process_rooms(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
"""
+ Rescan the state of all rooms so we can track
+
+ - who's in a public room;
+ - which local users share a private room with other users (local
+ and remote); and
+ - who should be in the user_directory.
+
Args:
progress (dict)
batch_size (int): Maximum number of state events to process
per cycle.
+
+ Returns:
+ number of events processed.
"""
# If we don't have progress filed, delete everything.
if not progress:
await self.delete_all_from_user_dir()
- def _get_next_batch(txn):
+ def _get_next_batch(
+ txn: LoggingTransaction,
+ ) -> Optional[Sequence[Tuple[str, int]]]:
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
@@ -157,7 +194,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
TEMP_TABLE + "_rooms",
)
txn.execute(sql)
- rooms_to_work_on = txn.fetchall()
+ rooms_to_work_on = cast(List[Tuple[str, int]], txn.fetchall())
if not rooms_to_work_on:
return None
@@ -165,7 +202,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- progress["remaining"] = txn.fetchone()[0]
+ result = txn.fetchone()
+ assert result is not None
+ progress["remaining"] = result[0]
return rooms_to_work_on
@@ -263,34 +302,33 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return processed_event_count
- async def _populate_user_directory_process_users(self, progress, batch_size):
+ async def _populate_user_directory_process_users(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
"""
- If search_all_users is enabled, add all of the users to the user directory.
+ Add all local users to the user directory.
"""
- if not self.hs.config.user_directory_search_all_users:
- await self.db_pool.updates._end_background_update(
- "populate_user_directory_process_users"
- )
- return 1
- def _get_next_batch(txn):
+ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
sql = "SELECT user_id FROM %s LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
- users_to_work_on = txn.fetchall()
+ user_result = cast(List[Tuple[str]], txn.fetchall())
- if not users_to_work_on:
+ if not user_result:
return None
- users_to_work_on = [x[0] for x in users_to_work_on]
+ users_to_work_on = [x[0] for x in user_result]
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
- progress["remaining"] = txn.fetchone()[0]
+ count_result = txn.fetchone()
+ assert count_result is not None
+ progress["remaining"] = count_result[0]
return users_to_work_on
@@ -331,7 +369,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return len(users_to_work_on)
- async def is_room_world_readable_or_publicly_joinable(self, room_id):
+ async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""
# Create a state filter that only queries join and history state event
@@ -375,7 +413,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if not isinstance(avatar_url, str):
avatar_url = None
- def _update_profile_in_user_dir_txn(txn):
+ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_txn(
txn,
table="user_directory",
@@ -442,7 +480,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
for user_id, other_user_id in user_id_tuples
],
value_names=(),
- value_values=None,
+ value_values=(),
desc="add_users_who_share_room",
)
@@ -461,14 +499,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
key_names=["user_id", "room_id"],
key_values=[(user_id, room_id) for user_id in user_ids],
value_names=(),
- value_values=None,
+ value_values=(),
desc="add_users_in_public_rooms",
)
async def delete_all_from_user_dir(self) -> None:
"""Delete the entire user directory"""
- def _delete_all_from_user_dir_txn(txn):
+ def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
txn.execute("DELETE FROM users_in_public_rooms")
@@ -480,7 +518,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
@cached()
- async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, Any]]:
+ async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, str]]:
return await self.db_pool.simple_select_one(
table="user_directory",
keyvalues={"user_id": user_id},
@@ -504,16 +542,21 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
# add_users_who_share_private_rooms?
SHARE_PRIVATE_WORKING_SET = 500
- def __init__(self, database: DatabasePool, db_conn, hs):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: Connection,
+ hs: "HomeServer",
+ ) -> None:
super().__init__(database, db_conn, hs)
self._prefer_local_users_in_search = (
- hs.config.user_directory_search_prefer_local_users
+ hs.config.userdirectory.user_directory_search_prefer_local_users
)
self._server_name = hs.config.server.server_name
async def remove_from_user_dir(self, user_id: str) -> None:
- def _remove_from_user_dir_txn(txn):
+ def _remove_from_user_dir_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_delete_txn(
txn, table="user_directory", keyvalues={"user_id": user_id}
)
@@ -539,7 +582,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
"remove_from_user_dir", _remove_from_user_dir_txn
)
- async def get_users_in_dir_due_to_room(self, room_id):
+ async def get_users_in_dir_due_to_room(self, room_id: str) -> Set[str]:
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
@@ -572,7 +615,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
room_id
"""
- def _remove_user_who_share_room_txn(txn):
+ def _remove_user_who_share_room_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_delete_txn(
txn,
table="users_who_share_private_rooms",
@@ -593,7 +636,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
"remove_user_who_share_room", _remove_user_who_share_room_txn
)
- async def get_user_dir_rooms_user_is_in(self, user_id):
+ async def get_user_dir_rooms_user_is_in(self, user_id: str) -> List[str]:
"""
Returns the rooms that a user is in.
@@ -635,7 +678,9 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
A set of room ID's that the users share.
"""
- def _get_shared_rooms_for_users_txn(txn):
+ def _get_shared_rooms_for_users_txn(
+ txn: LoggingTransaction,
+ ) -> List[Dict[str, str]]:
txn.execute(
"""
SELECT p1.room_id
@@ -676,7 +721,9 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
desc="get_user_directory_stream_pos",
)
- async def search_user_dir(self, user_id, search_term, limit):
+ async def search_user_dir(
+ self, user_id: str, search_term: str, limit: int
+ ) -> JsonDict:
"""Searches for users in directory
Returns:
@@ -694,7 +741,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
}
"""
- if self.hs.config.user_directory_search_all_users:
+ if self.hs.config.userdirectory.user_directory_search_all_users:
join_args = (user_id,)
where_clause = "user_id != ?"
else:
@@ -712,7 +759,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
# We allow manipulating the ranking algorithm by injecting statements
# based on config options.
additional_ordering_statements = []
- ordering_arguments = ()
+ ordering_arguments: Tuple[str, ...] = ()
if isinstance(self.database_engine, PostgresEngine):
full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
@@ -818,7 +865,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
return {"limited": limited, "results": results}
-def _parse_query_sqlite(search_term):
+def _parse_query_sqlite(search_term: str) -> str:
"""Takes a plain unicode string from the user and converts it into a form
that can be passed to database.
We use this so that we can add prefix matching, which isn't something
@@ -833,7 +880,7 @@ def _parse_query_sqlite(search_term):
return " & ".join("(%s* OR %s)" % (result, result) for result in results)
-def _parse_query_postgres(search_term):
+def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
"""Takes a plain unicode string from the user and converts it into a form
that can be passed to database.
We use this so that we can add prefix matching, which isn't something
|