diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 95d2caff62..0084d9f96c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -280,18 +280,18 @@ class LoggingTransaction:
else:
self.executemany(sql, args)
- def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
+ def execute_values(self, sql: str, *args: Any, fetch: bool = True) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.
- Always sets fetch=True when caling `execute_values`, so will return the
- results.
+ The `fetch` parameter must be set to False if the query does not return
+ rows (e.g. INSERTs).
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values # type: ignore
return self._do_execute(
- lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args
+ lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args
)
def execute(self, sql: str, *args: Any) -> None:
@@ -920,13 +920,23 @@ class DatabasePool:
if k != keys[0]:
raise RuntimeError("All items must have the same keys")
- sql = "INSERT INTO %s (%s) VALUES(%s)" % (
- table,
- ", ".join(k for k in keys[0]),
- ", ".join("?" for _ in keys[0]),
- )
+ if isinstance(txn.database_engine, PostgresEngine):
+ # We use `execute_values` as it can be a lot faster than `execute_batch`,
+ # but it's only available on postgres.
+ sql = "INSERT INTO %s (%s) VALUES ?" % (
+ table,
+ ", ".join(k for k in keys[0]),
+ )
- txn.execute_batch(sql, vals)
+ txn.execute_values(sql, vals, fetch=False)
+ else:
+ sql = "INSERT INTO %s (%s) VALUES(%s)" % (
+ table,
+ ", ".join(k for k in keys[0]),
+ ", ".join("?" for _ in keys[0]),
+ )
+
+ txn.execute_batch(sql, vals)
async def simple_upsert(
self,
@@ -1281,20 +1291,33 @@ class DatabasePool:
k + "=EXCLUDED." + k for k in value_names
)
- sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
- table,
- ", ".join(k for k in allnames),
- ", ".join("?" for _ in allnames),
- ", ".join(key_names),
- latter,
- )
-
args = []
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
- return txn.execute_batch(sql, args)
+ if isinstance(txn.database_engine, PostgresEngine):
+ # We use `execute_values` as it can be a lot faster than `execute_batch`,
+ # but it's only available on postgres.
+ sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
+ table,
+ ", ".join(k for k in allnames),
+ ", ".join(key_names),
+ latter,
+ )
+
+ txn.execute_values(sql, args, fetch=False)
+
+ else:
+ sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
+ table,
+ ", ".join(k for k in allnames),
+ ", ".join("?" for _ in allnames),
+ ", ".join(key_names),
+ latter,
+ )
+
+ return txn.execute_batch(sql, args)
@overload
async def simple_select_one(
diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 86075bc55b..6daf8b8ffb 100644
--- a/synapse/storage/databases/main/directory.py
+++ b/synapse/storage/databases/main/directory.py
@@ -75,8 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
desc="get_aliases_for_room",
)
-
-class DirectoryStore(DirectoryWorkerStore):
async def create_room_alias_association(
self,
room_alias: RoomAlias,
@@ -126,6 +124,8 @@ class DirectoryStore(DirectoryWorkerStore):
409, "Room alias %s already exists" % room_alias.to_string()
)
+
+class DirectoryStore(DirectoryWorkerStore):
async def delete_room_alias(self, room_alias: RoomAlias) -> str:
room_id = await self.db_pool.runInteraction(
"delete_room_alias", self._delete_room_alias_txn, room_alias
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 40b53274fb..8e691678e5 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -575,7 +575,13 @@ class PersistEventsStore:
missing_auth_chains.clear()
- for auth_id, event_type, state_key, chain_id, sequence_number in txn:
+ for (
+ auth_id,
+ event_type,
+ state_key,
+ chain_id,
+ sequence_number,
+ ) in txn.fetchall():
event_to_types[auth_id] = (event_type, state_key)
if chain_id is None:
@@ -1379,18 +1385,18 @@ class PersistEventsStore:
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
- sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?"
- txn.execute_batch(
- sql,
- (
- (
- False,
- event.event_id,
- )
- for event, _ in events_and_contexts
- if not event.internal_metadata.is_redacted()
- ),
+ unredacted_events = [
+ event.event_id
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_redacted()
+ ]
+ sql = "UPDATE redactions SET have_censored = ? WHERE "
+ clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "redacts",
+ unredacted_events,
)
+ txn.execute(sql + clause, [False] + args)
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
@@ -1541,35 +1547,32 @@ class PersistEventsStore:
to_prefill = []
rows = []
- N = 200
- for i in range(0, len(events_and_contexts), N):
- ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
- if not ev_map:
- break
-
- sql = (
- "SELECT "
- " e.event_id as event_id, "
- " r.redacts as redacts,"
- " rej.event_id as rejects "
- " FROM events as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE "
- )
- clause, args = make_in_list_sql_clause(
- self.database_engine, "e.event_id", list(ev_map)
- )
+ ev_map = {e.event_id: e for e, _ in events_and_contexts}
+ if not ev_map:
+ return
- txn.execute(sql + clause, args)
- rows = self.db_pool.cursor_to_dict(txn)
- for row in rows:
- event = ev_map[row["event_id"]]
- if not row["rejects"] and not row["redacts"]:
- to_prefill.append(
- _EventCacheEntry(event=event, redacted_event=None)
- )
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE "
+ )
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "e.event_id", list(ev_map)
+ )
+
+ txn.execute(sql + clause, args)
+ rows = self.db_pool.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(event=event, redacted_event=None))
def prefill():
for cache_entry in to_prefill:
@@ -1770,10 +1773,21 @@ class PersistEventsStore:
# Not a insertion event
return
- # Skip processing a insertion event if the room version doesn't
- # support it.
+ # Skip processing an insertion event if the room version doesn't
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
@@ -1822,9 +1836,20 @@ class PersistEventsStore:
return
# Skip processing a chunk event if the room version doesn't
- # support it.
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
@@ -1962,6 +1987,15 @@ class PersistEventsStore:
events_and_context.
"""
+ # Only non outlier events will have push actions associated with them,
+ # so let's filter them out. (This makes joining large rooms faster, as
+ # these queries took seconds to process all the state events).
+ non_outlier_events = [
+ event
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_outlier()
+ ]
+
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
@@ -1972,7 +2006,7 @@ class PersistEventsStore:
WHERE event_id = ?
"""
- if events_and_contexts:
+ if non_outlier_events:
txn.execute_batch(
sql,
(
@@ -1982,12 +2016,12 @@ class PersistEventsStore:
event.depth,
event.event_id,
)
- for event, _ in events_and_contexts
+ for event in non_outlier_events
),
)
room_to_event_ids: Dict[str, List[str]] = {}
- for e, _ in events_and_contexts:
+ for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
for room_id, event_ids in room_to_event_ids.items():
@@ -2012,7 +2046,11 @@ class PersistEventsStore:
# persisted.
txn.execute_batch(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
- ((event.event_id,) for event, _ in all_events_and_contexts),
+ (
+ (event.event_id,)
+ for event, _ in all_events_and_contexts
+ if not event.internal_metadata.is_outlier()
+ ),
)
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 1388771c40..12cf6995eb 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -29,7 +29,26 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
-class PresenceStore(SQLBaseStore):
+class PresenceBackgroundUpdateStore(SQLBaseStore):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: Connection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ # Used by `PresenceStore._get_active_presence()`
+ self.db_pool.updates.register_background_index_update(
+ "presence_stream_not_offline_index",
+ index_name="presence_stream_state_not_offline_idx",
+ table="presence_stream",
+ columns=["state"],
+ where_clause="state != 'offline'",
+ )
+
+
+class PresenceStore(PresenceBackgroundUpdateStore):
def __init__(
self,
database: DatabasePool,
@@ -332,6 +351,8 @@ class PresenceStore(SQLBaseStore):
the appropriate time outs.
"""
+ # The `presence_stream_state_not_offline_idx` index should be used for this
+ # query.
sql = (
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index f98b892598..6e7312266d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -19,9 +19,10 @@ from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
-from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.constants import EventContentFields, EventTypes, JoinRules
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions
+from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchStore
@@ -1013,6 +1014,7 @@ class _BackgroundUpdates:
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+ POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column"
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
@@ -1054,6 +1056,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
self._background_replace_room_depth_min_depth,
)
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
+ self._background_populate_rooms_creator_column,
+ )
+
async def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
@@ -1273,7 +1280,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
keyvalues={"room_id": room_id},
retcol="MAX(stream_ordering)",
allow_none=True,
- desc="upsert_room_on_join",
+ desc="has_auth_chain_index_fallback",
)
return max_ordering is None
@@ -1343,6 +1350,65 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
return 0
+ async def _background_populate_rooms_creator_column(
+ self, progress: dict, batch_size: int
+ ):
+ """Background update to go and add creator information to `rooms`
+ table from `current_state_events` table.
+ """
+
+ last_room_id = progress.get("room_id", "")
+
+ def _background_populate_rooms_creator_column_txn(txn: LoggingTransaction):
+ sql = """
+ SELECT room_id, json FROM event_json
+ INNER JOIN rooms AS room USING (room_id)
+ INNER JOIN current_state_events AS state_event USING (room_id, event_id)
+ WHERE room_id > ? AND (room.creator IS NULL OR room.creator = '') AND state_event.type = 'm.room.create' AND state_event.state_key = ''
+ ORDER BY room_id
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_room_id, batch_size))
+ room_id_to_create_event_results = txn.fetchall()
+
+ new_last_room_id = ""
+ for room_id, event_json in room_id_to_create_event_results:
+ event_dict = db_to_json(event_json)
+
+ creator = event_dict.get("content").get(EventContentFields.ROOM_CREATOR)
+
+ self.db_pool.simple_update_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"creator": creator},
+ )
+ new_last_room_id = room_id
+
+ if new_last_room_id == "":
+ return True
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
+ {"room_id": new_last_room_id},
+ )
+
+ return False
+
+ end = await self.db_pool.runInteraction(
+ "_background_populate_rooms_creator_column",
+ _background_populate_rooms_creator_column_txn,
+ )
+
+ if end:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN
+ )
+
+ return batch_size
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@@ -1350,7 +1416,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
self.config = hs.config
- async def upsert_room_on_join(self, room_id: str, room_version: RoomVersion):
+ async def upsert_room_on_join(
+ self, room_id: str, room_version: RoomVersion, auth_events: List[EventBase]
+ ):
"""Ensure that the room is stored in the table
Called when we join a room over federation, and overwrites any room version
@@ -1361,6 +1429,24 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
# mark the room as having an auth chain cover index.
has_auth_chain_index = await self.has_auth_chain_index(room_id)
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ if create_event is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise StoreError(400, "No create event in state")
+
+ room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+ if not isinstance(room_creator, str):
+ # If the create event does not have a creator then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise StoreError(400, "No creator defined on the create event")
+
await self.db_pool.simple_upsert(
desc="upsert_room_on_join",
table="rooms",
@@ -1368,7 +1454,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
values={"room_version": room_version.identifier},
insertion_values={
"is_public": False,
- "creator": "",
+ "creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
@@ -1396,6 +1482,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
insertion_values={
"room_version": room_version.identifier,
"is_public": False,
+ # We don't worry about setting the `creator` here because
+ # we don't process any messages in a room while a user is
+ # invited (only after the join).
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 42edbcc057..4245fa1a3c 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -22,7 +22,7 @@ from typing_extensions import Counter
from twisted.internet.defer import DeferredLock
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.errors import StoreError
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.state_deltas import StateDeltasStore
@@ -590,7 +590,7 @@ class StatsStore(StateDeltasStore):
room_state["canonical_alias"] = event.content.get("alias")
elif event.type == EventTypes.Create:
room_state["is_federatable"] = (
- event.content.get("m.federate", True) is True
+ event.content.get(EventContentFields.FEDERATE, True) is True
)
await self.update_room_state(room_id, room_state)
diff --git a/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql b/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql
new file mode 100644
index 0000000000..f7c0b31261
--- /dev/null
+++ b/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json)
+ VALUES (6302, 'populate_rooms_creator_column', '{}');
diff --git a/synapse/storage/schema/main/delta/63/04add_presence_stream_not_offline_index.sql b/synapse/storage/schema/main/delta/63/04add_presence_stream_not_offline_index.sql
new file mode 100644
index 0000000000..b90856004b
--- /dev/null
+++ b/synapse/storage/schema/main/delta/63/04add_presence_stream_not_offline_index.sql
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2021 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (6304, 'presence_stream_not_offline_index', '{}');
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index c768fdea56..6f7cbe40f4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -19,6 +19,7 @@ from contextlib import contextmanager
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
import attr
+from sortedcontainers import SortedSet
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
@@ -240,7 +241,7 @@ class MultiWriterIdGenerator:
# Set of local IDs that we're still processing. The current position
# should be less than the minimum of this set (if not empty).
- self._unfinished_ids: Set[int] = set()
+ self._unfinished_ids: SortedSet[int] = SortedSet()
# Set of local IDs that we've processed that are larger than the current
# position, due to there being smaller unpersisted IDs.
@@ -473,7 +474,7 @@ class MultiWriterIdGenerator:
finished = set()
- min_unfinshed = min(self._unfinished_ids)
+ min_unfinshed = self._unfinished_ids[0]
for s in self._finished_ids:
if s < min_unfinshed:
if new_cur is None or new_cur < s:
|