diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/oidc_config.py | 11 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 125 | ||||
-rw-r--r-- | synapse/rest/admin/users.py | 21 | ||||
-rw-r--r-- | synapse/storage/database.py | 11 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_push_actions.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 18 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 12 | ||||
-rw-r--r-- | synapse/storage/databases/main/media_repository.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/registration.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/59/01ignored_user.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/search.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/state/store.py | 4 |
15 files changed, 132 insertions, 104 deletions
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py index d58a83be7f..bfeceeed18 100644 --- a/synapse/config/oidc_config.py +++ b/synapse/config/oidc_config.py @@ -15,6 +15,7 @@ # limitations under the License. import string +from collections import Counter from typing import Iterable, Optional, Tuple, Type import attr @@ -43,6 +44,16 @@ class OIDCConfig(Config): except DependencyException as e: raise ConfigError(e.message) from e + # check we don't have any duplicate idp_ids now. (The SSO handler will also + # check for duplicates when the REST listeners get registered, but that happens + # after synapse has forked so doesn't give nice errors.) + c = Counter([i.idp_id for i in self.oidc_providers]) + for idp_id, count in c.items(): + if count > 1: + raise ConfigError( + "Multiple OIDC providers have the idp_id %r." % idp_id + ) + public_baseurl = self.public_baseurl self.oidc_callback_url = public_baseurl + "_synapse/oidc/callback" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 302b2f69bc..d330ae5dbc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -18,6 +18,7 @@ import copy import itertools import logging from typing import ( + TYPE_CHECKING, Any, Awaitable, Callable, @@ -26,7 +27,6 @@ from typing import ( List, Mapping, Optional, - Sequence, Tuple, TypeVar, Union, @@ -61,6 +61,9 @@ from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) @@ -80,10 +83,10 @@ class InvalidResponseError(RuntimeError): class FederationClient(FederationBase): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.pdu_destination_tried = {} + self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]] self._clock.looping_call(self._clear_tried_cache, 60 * 1000) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() @@ -116,33 +119,32 @@ class FederationClient(FederationBase): self.pdu_destination_tried[event_id] = destination_dict @log_function - def make_query( + async def make_query( self, - destination, - query_type, - args, - retry_on_dns_fail=False, - ignore_backoff=False, - ): + destination: str, + query_type: str, + args: dict, + retry_on_dns_fail: bool = False, + ignore_backoff: bool = False, + ) -> JsonDict: """Sends a federation Query to a remote homeserver of the given type and arguments. Args: - destination (str): Domain name of the remote homeserver - query_type (str): Category of the query type; should match the + destination: Domain name of the remote homeserver + query_type: Category of the query type; should match the handler name used in register_query_handler(). - args (dict): Mapping of strings to strings containing the details + args: Mapping of strings to strings containing the details of the query request. - ignore_backoff (bool): true to ignore the historical backoff data + ignore_backoff: true to ignore the historical backoff data and try the request anyway. Returns: - a Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels(query_type).inc() - return self.transport_layer.make_query( + return await self.transport_layer.make_query( destination, query_type, args, @@ -151,42 +153,52 @@ class FederationClient(FederationBase): ) @log_function - def query_client_keys(self, destination, content, timeout): + async def query_client_keys( + self, destination: str, content: JsonDict, timeout: int + ) -> JsonDict: """Query device keys for a device hosted on a remote server. Args: - destination (str): Domain name of the remote homeserver - content (dict): The query content. + destination: Domain name of the remote homeserver + content: The query content. Returns: - an Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels("client_device_keys").inc() - return self.transport_layer.query_client_keys(destination, content, timeout) + return await self.transport_layer.query_client_keys( + destination, content, timeout + ) @log_function - def query_user_devices(self, destination, user_id, timeout=30000): + async def query_user_devices( + self, destination: str, user_id: str, timeout: int = 30000 + ) -> JsonDict: """Query the device keys for a list of user ids hosted on a remote server. """ sent_queries_counter.labels("user_devices").inc() - return self.transport_layer.query_user_devices(destination, user_id, timeout) + return await self.transport_layer.query_user_devices( + destination, user_id, timeout + ) @log_function - def claim_client_keys(self, destination, content, timeout): + async def claim_client_keys( + self, destination: str, content: JsonDict, timeout: int + ) -> JsonDict: """Claims one-time keys for a device hosted on a remote server. Args: - destination (str): Domain name of the remote homeserver - content (dict): The query content. + destination: Domain name of the remote homeserver + content: The query content. Returns: - an Awaitable which will eventually yield a JSON object from the - response + The JSON object from the response """ sent_queries_counter.labels("client_one_time_keys").inc() - return self.transport_layer.claim_client_keys(destination, content, timeout) + return await self.transport_layer.claim_client_keys( + destination, content, timeout + ) async def backfill( self, dest: str, room_id: str, limit: int, extremities: Iterable[str] @@ -195,10 +207,10 @@ class FederationClient(FederationBase): given destination server. Args: - dest (str): The remote homeserver to ask. - room_id (str): The room_id to backfill. - limit (int): The maximum number of events to return. - extremities (list): our current backwards extremities, to backfill from + dest: The remote homeserver to ask. + room_id: The room_id to backfill. + limit: The maximum number of events to return. + extremities: our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -370,7 +382,7 @@ class FederationClient(FederationBase): for events that have failed their checks Returns: - Deferred : A list of PDUs that have valid signatures and hashes. + A list of PDUs that have valid signatures and hashes. """ deferreds = self._check_sigs_and_hashes(room_version, pdus) @@ -418,7 +430,9 @@ class FederationClient(FederationBase): else: return [p for p in valid_pdus if p] - async def get_event_auth(self, destination, room_id, event_id): + async def get_event_auth( + self, destination: str, room_id: str, event_id: str + ) -> List[EventBase]: res = await self.transport_layer.get_event_auth(destination, room_id, event_id) room_version = await self.store.get_room_version(room_id) @@ -700,18 +714,16 @@ class FederationClient(FederationBase): return await self._try_destination_list("send_join", destinations, send_request) - async def _do_send_join(self, destination: str, pdu: EventBase): + async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict: time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_join_v2( + return await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -769,7 +781,7 @@ class FederationClient(FederationBase): time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_invite_v2( + return await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -779,7 +791,6 @@ class FederationClient(FederationBase): "invite_room_state": pdu.unsigned.get("invite_room_state", []), }, ) - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -842,18 +853,16 @@ class FederationClient(FederationBase): "send_leave", destinations, send_request ) - async def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict: time_now = self._clock.time_msec() try: - content = await self.transport_layer.send_leave_v2( + return await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - - return content except HttpResponseException as e: if e.code in [400, 404]: err = e.to_synapse_error() @@ -879,7 +888,7 @@ class FederationClient(FederationBase): # content. return resp[1] - def get_public_rooms( + async def get_public_rooms( self, remote_server: str, limit: Optional[int] = None, @@ -887,7 +896,7 @@ class FederationClient(FederationBase): search_filter: Optional[Dict] = None, include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, - ): + ) -> JsonDict: """Get the list of public rooms from a remote homeserver Args: @@ -901,8 +910,7 @@ class FederationClient(FederationBase): party instance Returns: - Awaitable[Dict[str, Any]]: The response from the remote server, or None if - `remote_server` is the same as the local server_name + The response from the remote server. Raises: HttpResponseException: There was an exception returned from the remote server @@ -910,7 +918,7 @@ class FederationClient(FederationBase): requests over federation """ - return self.transport_layer.get_public_rooms( + return await self.transport_layer.get_public_rooms( remote_server, limit, since_token, @@ -923,7 +931,7 @@ class FederationClient(FederationBase): self, destination: str, room_id: str, - earliest_events_ids: Sequence[str], + earliest_events_ids: Iterable[str], latest_events: Iterable[EventBase], limit: int, min_depth: int, @@ -974,7 +982,9 @@ class FederationClient(FederationBase): return signed_events - async def forward_third_party_invite(self, destinations, room_id, event_dict): + async def forward_third_party_invite( + self, destinations: Iterable[str], room_id: str, event_dict: JsonDict + ) -> None: for destination in destinations: if destination == self.server_name: continue @@ -983,7 +993,7 @@ class FederationClient(FederationBase): await self.transport_layer.exchange_third_party_invite( destination=destination, room_id=room_id, event_dict=event_dict ) - return None + return except CodeMessageException: raise except Exception as e: @@ -995,7 +1005,7 @@ class FederationClient(FederationBase): async def get_room_complexity( self, destination: str, room_id: str - ) -> Optional[dict]: + ) -> Optional[JsonDict]: """ Fetch the complexity of a remote room from another server. @@ -1008,10 +1018,9 @@ class FederationClient(FederationBase): could not fetch the complexity. """ try: - complexity = await self.transport_layer.get_room_complexity( + return await self.transport_layer.get_room_complexity( destination=destination, room_id=room_id ) - return complexity except CodeMessageException as e: # We didn't manage to get it -- probably a 404. We are okay if other # servers don't give it to us. diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index f39e3d6d5c..86198bab30 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -83,17 +83,32 @@ class UsersRestServletV2(RestServlet): The parameter `deactivated` can be used to include deactivated users. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() self.admin_handler = hs.get_admin_handler() - async def on_GET(self, request): + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) start = parse_integer(request, "from", default=0) limit = parse_integer(request, "limit", default=100) + + if start < 0: + raise SynapseError( + 400, + "Query parameter from must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + if limit < 0: + raise SynapseError( + 400, + "Query parameter limit must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + user_id = parse_string(request, "user_id", default=None) name = parse_string(request, "name", default=None) guests = parse_boolean(request, "guests", default=True) @@ -103,7 +118,7 @@ class UsersRestServletV2(RestServlet): start, limit, user_id, name, guests, deactivated ) ret = {"users": users, "total": total} - if len(users) >= limit: + if (start + limit) < total: ret["next_token"] = str(start + len(users)) return 200, ret diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a19d65ad23..d2ba4bd2fc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -262,13 +262,18 @@ class LoggingTransaction: return self.txn.description def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None: + """Similar to `executemany`, except `txn.rowcount` will not be correct + afterwards. + + More efficient than `executemany` on PostgreSQL + """ + if isinstance(self.database_engine, PostgresEngine): from psycopg2.extras import execute_batch # type: ignore self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args) else: - for val in args: - self.execute(sql, val) + self.executemany(sql, args) def execute_values(self, sql: str, *args: Any) -> List[Tuple]: """Corresponds to psycopg2.extras.execute_values. Only available when @@ -888,7 +893,7 @@ class DatabasePool: ", ".join("?" for _ in keys[0]), ) - txn.executemany(sql, vals) + txn.execute_batch(sql, vals) async def simple_upsert( self, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9097677648..659d8f245f 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -897,7 +897,7 @@ class DeviceWorkerStore(SQLBaseStore): DELETE FROM device_lists_outbound_last_success WHERE destination = ? AND user_id = ? """ - txn.executemany(sql, ((row[0], row[1]) for row in rows)) + txn.execute_batch(sql, ((row[0], row[1]) for row in rows)) logger.info("Pruned %d device list outbound pokes", count) @@ -1343,7 +1343,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): # Delete older entries in the table, as we really only care about # when the latest change happened. - txn.executemany( + txn.execute_batch( """ DELETE FROM device_lists_stream WHERE user_id = ? AND device_id = ? AND stream_id < ? diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1b657191a9..438383abe1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -487,7 +487,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): VALUES (?, ?, ?, ?, ?, ?) """ - txn.executemany( + txn.execute_batch( sql, ( _gen_entry(user_id, actions) @@ -803,7 +803,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ], ) - txn.executemany( + txn.execute_batch( """ UPDATE event_push_summary SET notif_count = ?, unread_count = ?, stream_ordering = ? diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 3216b3f3c8..5db7d7aaa8 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -876,7 +876,7 @@ class PersistEventsStore: WHERE room_id = ? AND type = ? AND state_key = ? ) """ - txn.executemany( + txn.execute_batch( sql, ( ( @@ -895,7 +895,7 @@ class PersistEventsStore: ) # Now we actually update the current_state_events table - txn.executemany( + txn.execute_batch( "DELETE FROM current_state_events" " WHERE room_id = ? AND type = ? AND state_key = ?", ( @@ -907,7 +907,7 @@ class PersistEventsStore: # We include the membership in the current state table, hence we do # a lookup when we insert. This assumes that all events have already # been inserted into room_memberships. - txn.executemany( + txn.execute_batch( """INSERT INTO current_state_events (room_id, type, state_key, event_id, membership) VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) @@ -927,7 +927,7 @@ class PersistEventsStore: # we have no record of the fact the user *was* a member of the # room but got, say, state reset out of it. if to_delete or to_insert: - txn.executemany( + txn.execute_batch( "DELETE FROM local_current_membership" " WHERE room_id = ? AND user_id = ?", ( @@ -938,7 +938,7 @@ class PersistEventsStore: ) if to_insert: - txn.executemany( + txn.execute_batch( """INSERT INTO local_current_membership (room_id, user_id, event_id, membership) VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) @@ -1738,7 +1738,7 @@ class PersistEventsStore: """ if events_and_contexts: - txn.executemany( + txn.execute_batch( sql, ( ( @@ -1767,7 +1767,7 @@ class PersistEventsStore: # Now we delete the staging area for *all* events that were being # persisted. - txn.executemany( + txn.execute_batch( "DELETE FROM event_push_actions_staging WHERE event_id = ?", ((event.event_id,) for event, _ in all_events_and_contexts), ) @@ -1886,7 +1886,7 @@ class PersistEventsStore: " )" ) - txn.executemany( + txn.execute_batch( query, [ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) @@ -1900,7 +1900,7 @@ class PersistEventsStore: "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) - txn.executemany( + txn.execute_batch( query, [ (ev.event_id, ev.room_id) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e46e44ba54..5ca4fa6817 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -139,8 +139,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 - def reindex_txn(txn): sql = ( "SELECT stream_ordering, event_id, json FROM events" @@ -178,9 +176,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" - for index in range(0, len(update_rows), INSERT_CLUMP_SIZE): - clump = update_rows[index : index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + txn.execute_batch(sql, update_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -210,8 +206,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 - def reindex_search_txn(txn): sql = ( "SELECT stream_ordering, event_id FROM events" @@ -256,9 +250,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" - for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): - clump = rows_to_update[index : index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + txn.execute_batch(sql, rows_to_update) progress = { "target_min_stream_id_inclusive": target_min_stream_id, diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 283c8a5e22..e017177655 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -417,7 +417,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " WHERE media_origin = ? AND media_id = ?" ) - txn.executemany( + txn.execute_batch( sql, ( (time_ms, media_origin, media_id) @@ -430,7 +430,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " WHERE media_id = ?" ) - txn.executemany(sql, ((time_ms, media_id) for media_id in local_media)) + txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media)) return await self.db_pool.runInteraction( "update_cached_last_access_time", update_cache_txn @@ -557,7 +557,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" def _delete_url_cache_txn(txn): - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) return await self.db_pool.runInteraction( "delete_url_cache", _delete_url_cache_txn @@ -586,11 +586,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): def _delete_url_cache_media_txn(txn): sql = "DELETE FROM local_media_repository WHERE media_id = ?" - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?" - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) return await self.db_pool.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 5d668aadb2..ecfc9f20b1 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -172,7 +172,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): ) # Update backward extremeties - txn.executemany( + txn.execute_batch( "INSERT INTO event_backward_extremities (room_id, event_id)" " VALUES (?, ?)", [(room_id, event_id) for event_id, in new_backwards_extrems], diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 8d05288ed4..585b4049d6 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1104,7 +1104,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): FROM user_threepids """ - txn.executemany(sql, [(id_server,) for id_server in id_servers]) + txn.execute_batch(sql, [(id_server,) for id_server in id_servers]) if id_servers: await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index dcdaf09682..92382bed28 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -873,8 +873,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): "max_stream_id_exclusive", self._stream_order_on_start + 1 ) - INSERT_CLUMP_SIZE = 1000 - def add_membership_profile_txn(txn): sql = """ SELECT stream_ordering, event_id, events.room_id, event_json.json @@ -915,9 +913,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): UPDATE room_memberships SET display_name = ?, avatar_url = ? WHERE event_id = ? AND room_id = ? """ - for index in range(0, len(to_update), INSERT_CLUMP_SIZE): - clump = to_update[index : index + INSERT_CLUMP_SIZE] - txn.executemany(to_update_sql, clump) + txn.execute_batch(to_update_sql, to_update) progress = { "target_min_stream_id_inclusive": target_min_stream_id, diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py index f35c70b699..9e8f35c1d2 100644 --- a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py +++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py @@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs # { "ignored_users": "@someone:example.org": {} } ignored_users = content.get("ignored_users", {}) if isinstance(ignored_users, dict) and ignored_users: - cur.executemany(insert_sql, [(user_id, u) for u in ignored_users]) + cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users]) # Add indexes after inserting data for efficiency. logger.info("Adding constraints to ignored_users table") diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index e34fce6281..871af64b11 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -63,7 +63,7 @@ class SearchWorkerStore(SQLBaseStore): for entry in entries ) - txn.executemany(sql, args) + txn.execute_batch(sql, args) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( @@ -75,7 +75,7 @@ class SearchWorkerStore(SQLBaseStore): for entry in entries ) - txn.executemany(sql, args) + txn.execute_batch(sql, args) else: # This should be unreachable. raise Exception("Unrecognized database engine") diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 0e31cc811a..89cdc84a9c 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -565,11 +565,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ) logger.info("[purge] removing redundant state groups") - txn.executemany( + txn.execute_batch( "DELETE FROM state_groups_state WHERE state_group = ?", ((sg,) for sg in state_groups_to_delete), ) - txn.executemany( + txn.execute_batch( "DELETE FROM state_groups WHERE id = ?", ((sg,) for sg in state_groups_to_delete), ) |