diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 1dc347f0c9..5c21402dea 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -61,6 +61,7 @@ from .registration import RegistrationStore
from .rejections import RejectionsStore
from .relations import RelationsStore
from .room import RoomStore
+from .room_batch import RoomBatchStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
@@ -81,6 +82,7 @@ class DataStore(
EventsBackgroundUpdatesStore,
RoomMemberStore,
RoomStore,
+ RoomBatchStore,
RegistrationStore,
StreamStore,
ProfileStore,
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 1d02795f43..d0cf3460da 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -494,7 +494,7 @@ class AccountDataWorkerStore(SQLBaseStore):
txn,
table="ignored_users",
column="ignored_user_id",
- iterable=previously_ignored_users - currently_ignored_users,
+ values=previously_ignored_users - currently_ignored_users,
keyvalues={"ignorer_user_id": user_id},
)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 047782eb06..10184d6ae7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
LIMIT ?
"""
- # Find any chunk connections of a given insertion event
- chunk_connection_query = """
+ # Find any batch connections of a given insertion event
+ batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
- /* Find the chunk that connects to the given insertion event */
- INNER JOIN chunk_events AS c
- ON i.next_chunk_id = c.chunk_id
- /* Get the depth of the chunk start event from the events table */
+ /* Find the batch that connects to the given insertion event */
+ INNER JOIN batch_events AS c
+ ON i.next_batch_id = c.batch_id
+ /* Get the depth of the batch start event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
@@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
- # Try and find any potential historical chunks of message history.
+ # Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
- # find any chunk events connected to the insertion event (by
- # chunk_id). If we find any, we'll add them to the queue and
+ # find any batch events connected to the insertion event (by
+ # batch_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
@@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
- # Find any chunk connections for the given insertion event
+ # Find any batch connections for the given insertion event
txn.execute(
- chunk_connection_query,
+ batch_connection_query,
(connected_insertion_event, limit - len(event_results)),
)
- chunk_start_event_id_results = txn.fetchall()
+ batch_start_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events: chunk_start_event_id_results %s",
- chunk_start_event_id_results,
+ "_get_backfill_events: batch_start_event_id_results %s",
+ batch_start_event_id_results,
)
- for row in chunk_start_event_id_results:
+ for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 8e691678e5..584f818ff3 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -667,7 +667,7 @@ class PersistEventsStore:
table="event_auth_chain_to_calculate",
keyvalues={},
column="event_id",
- iterable=new_chain_tuples,
+ values=new_chain_tuples,
)
# Now we need to calculate any new links between chains caused by
@@ -1509,7 +1509,7 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_insertion_event(txn, event)
- self._handle_chunk_event(txn, event)
+ self._handle_batch_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
@@ -1790,23 +1790,23 @@ class PersistEventsStore:
):
return
- next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
- if next_chunk_id is None:
- # Invalid insertion event without next chunk ID
+ next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
+ if next_batch_id is None:
+ # Invalid insertion event without next batch ID
return
logger.debug(
- "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "next_chunk_id": next_chunk_id,
+ "next_batch_id": next_batch_id,
},
)
@@ -1822,8 +1822,8 @@ class PersistEventsStore:
},
)
- def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
- """Handles inserting the chunk edges/connections between the chunk event
+ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the batch edges/connections between the batch event
and an insertion event. Part of MSC2716.
Args:
@@ -1831,11 +1831,11 @@ class PersistEventsStore:
event: The event to process
"""
- if event.type != EventTypes.MSC2716_CHUNK:
- # Not a chunk event
+ if event.type != EventTypes.MSC2716_BATCH:
+ # Not a batch event
return
- # Skip processing a chunk event if the room version doesn't
+ # Skip processing a batch event if the room version doesn't
# support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
room_creator = self.db_pool.simple_select_one_onecol_txn(
@@ -1852,35 +1852,35 @@ class PersistEventsStore:
):
return
- chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
- if chunk_id is None:
- # Invalid chunk event without a chunk ID
+ batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
+ if batch_id is None:
+ # Invalid batch event without a batch ID
return
- logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+ logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
- table="chunk_events",
+ table="batch_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "chunk_id": chunk_id,
+ "batch_id": batch_id,
},
)
- # When we receive an event with a `chunk_id` referencing the
- # `next_chunk_id` of the insertion event, we can remove it from the
+ # When we receive an event with a `batch_id` referencing the
+ # `next_batch_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
- WHERE next_chunk_id = ?
+ WHERE next_batch_id = ?
)
"""
- txn.execute(sql, (chunk_id,))
+ txn.execute(sql, (batch_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 6fcb2b8353..1afc59fafb 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -490,7 +490,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
txn=txn,
table="event_forward_extremities",
column="event_id",
- iterable=to_delete,
+ values=to_delete,
keyvalues={},
)
@@ -520,7 +520,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
txn=txn,
table="_extremities_to_check",
column="event_id",
- iterable=original_set,
+ values=original_set,
keyvalues={},
)
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 63ac09c61d..a93caae8d0 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -324,7 +324,7 @@ class PusherWorkerStore(SQLBaseStore):
txn,
table="pushers",
column="user_name",
- iterable=users,
+ values=users,
keyvalues={},
)
@@ -373,7 +373,7 @@ class PusherWorkerStore(SQLBaseStore):
txn,
table="pushers",
column="id",
- iterable=(pusher_id for pusher_id, token in pushers if token is None),
+ values=[pusher_id for pusher_id, token in pushers if token is None],
keyvalues={},
)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index edeaacd7a6..01a4281301 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Optional, Tuple
from twisted.internet import defer
@@ -153,12 +153,12 @@ class ReceiptsWorkerStore(SQLBaseStore):
}
async def get_linearized_receipts_for_rooms(
- self, room_ids: List[str], to_key: int, from_key: Optional[int] = None
+ self, room_ids: Iterable[str], to_key: int, from_key: Optional[int] = None
) -> List[dict]:
"""Get receipts for multiple rooms for sending to clients.
Args:
- room_id: List of room_ids.
+ room_id: The room IDs to fetch receipts of.
to_key: Max stream id to fetch receipts up to.
from_key: Min stream id to fetch receipts from. None fetches
from the start.
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
new file mode 100644
index 0000000000..a383388757
--- /dev/null
+++ b/synapse/storage/databases/main/room_batch.py
@@ -0,0 +1,36 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Optional
+
+from synapse.storage._base import SQLBaseStore
+
+
+class RoomBatchStore(SQLBaseStore):
+ async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
+ """Retrieve a insertion event ID.
+
+ Args:
+ batch_id: The batch ID of the insertion event to retrieve.
+
+ Returns:
+ The event_id of an insertion event, or None if there is no known
+ insertion event for the given insertion event.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="insertion_events",
+ keyvalues={"next_batch_id": batch_id},
+ retcol="event_id",
+ allow_none=True,
+ )
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 8e22da99ae..a8e8dd4577 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -473,7 +473,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
txn,
table="current_state_events",
column="room_id",
- iterable=to_delete,
+ values=to_delete,
keyvalues={},
)
@@ -481,7 +481,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
txn,
table="event_forward_extremities",
column="room_id",
- iterable=to_delete,
+ values=to_delete,
keyvalues={},
)
diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index 4d6bbc94c7..340ca9e47d 100644
--- a/synapse/storage/databases/main/ui_auth.py
+++ b/synapse/storage/databases/main/ui_auth.py
@@ -326,7 +326,7 @@ class UIAuthWorkerStore(SQLBaseStore):
txn,
table="ui_auth_sessions_ips",
column="session_id",
- iterable=session_ids,
+ values=session_ids,
keyvalues={},
)
@@ -377,7 +377,7 @@ class UIAuthWorkerStore(SQLBaseStore):
txn,
table="ui_auth_sessions_credentials",
column="session_id",
- iterable=session_ids,
+ values=session_ids,
keyvalues={},
)
@@ -386,7 +386,7 @@ class UIAuthWorkerStore(SQLBaseStore):
txn,
table="ui_auth_sessions",
column="session_id",
- iterable=session_ids,
+ values=session_ids,
keyvalues={},
)
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 8aebdc2817..718f3e9976 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -85,19 +85,17 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms
- # If search all users is on, get all the users we want to add.
- if self.hs.config.user_directory_search_all_users:
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_users(user_id TEXT NOT NULL)"
- )
- txn.execute(sql)
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_users(user_id TEXT NOT NULL)"
+ )
+ txn.execute(sql)
- txn.execute("SELECT name FROM users")
- users = [{"user_id": x[0]} for x in txn.fetchall()]
+ txn.execute("SELECT name FROM users")
+ users = [{"user_id": x[0]} for x in txn.fetchall()]
- self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+ self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
new_pos = await self.get_max_stream_id_in_current_state_deltas()
await self.db_pool.runInteraction(
@@ -265,13 +263,8 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
async def _populate_user_directory_process_users(self, progress, batch_size):
"""
- If search_all_users is enabled, add all of the users to the user directory.
+ Add all local users to the user directory.
"""
- if not self.hs.config.user_directory_search_all_users:
- await self.db_pool.updates._end_background_update(
- "populate_user_directory_process_users"
- )
- return 1
def _get_next_batch(txn):
sql = "SELECT user_id FROM %s LIMIT %s" % (
|