summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/oidc_config.py11
-rw-r--r--synapse/federation/federation_client.py125
-rw-r--r--synapse/rest/admin/users.py21
-rw-r--r--synapse/storage/database.py11
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/event_push_actions.py4
-rw-r--r--synapse/storage/databases/main/events.py18
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py12
-rw-r--r--synapse/storage/databases/main/media_repository.py10
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/databases/main/registration.py2
-rw-r--r--synapse/storage/databases/main/roommember.py6
-rw-r--r--synapse/storage/databases/main/schema/delta/59/01ignored_user.py2
-rw-r--r--synapse/storage/databases/main/search.py4
-rw-r--r--synapse/storage/databases/state/store.py4
15 files changed, 132 insertions, 104 deletions
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index d58a83be7f..bfeceeed18 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import string
+from collections import Counter
 from typing import Iterable, Optional, Tuple, Type
 
 import attr
@@ -43,6 +44,16 @@ class OIDCConfig(Config):
         except DependencyException as e:
             raise ConfigError(e.message) from e
 
+        # check we don't have any duplicate idp_ids now. (The SSO handler will also
+        # check for duplicates when the REST listeners get registered, but that happens
+        # after synapse has forked so doesn't give nice errors.)
+        c = Counter([i.idp_id for i in self.oidc_providers])
+        for idp_id, count in c.items():
+            if count > 1:
+                raise ConfigError(
+                    "Multiple OIDC providers have the idp_id %r." % idp_id
+                )
+
         public_baseurl = self.public_baseurl
         self.oidc_callback_url = public_baseurl + "_synapse/oidc/callback"
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 302b2f69bc..d330ae5dbc 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -18,6 +18,7 @@ import copy
 import itertools
 import logging
 from typing import (
+    TYPE_CHECKING,
     Any,
     Awaitable,
     Callable,
@@ -26,7 +27,6 @@ from typing import (
     List,
     Mapping,
     Optional,
-    Sequence,
     Tuple,
     TypeVar,
     Union,
@@ -61,6 +61,9 @@ from synapse.util import unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.retryutils import NotRetryingDestination
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
@@ -80,10 +83,10 @@ class InvalidResponseError(RuntimeError):
 
 
 class FederationClient(FederationBase):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        self.pdu_destination_tried = {}
+        self.pdu_destination_tried = {}  # type: Dict[str, Dict[str, int]]
         self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
@@ -116,33 +119,32 @@ class FederationClient(FederationBase):
                 self.pdu_destination_tried[event_id] = destination_dict
 
     @log_function
-    def make_query(
+    async def make_query(
         self,
-        destination,
-        query_type,
-        args,
-        retry_on_dns_fail=False,
-        ignore_backoff=False,
-    ):
+        destination: str,
+        query_type: str,
+        args: dict,
+        retry_on_dns_fail: bool = False,
+        ignore_backoff: bool = False,
+    ) -> JsonDict:
         """Sends a federation Query to a remote homeserver of the given type
         and arguments.
 
         Args:
-            destination (str): Domain name of the remote homeserver
-            query_type (str): Category of the query type; should match the
+            destination: Domain name of the remote homeserver
+            query_type: Category of the query type; should match the
                 handler name used in register_query_handler().
-            args (dict): Mapping of strings to strings containing the details
+            args: Mapping of strings to strings containing the details
                 of the query request.
-            ignore_backoff (bool): true to ignore the historical backoff data
+            ignore_backoff: true to ignore the historical backoff data
                 and try the request anyway.
 
         Returns:
-            a Awaitable which will eventually yield a JSON object from the
-            response
+            The JSON object from the response
         """
         sent_queries_counter.labels(query_type).inc()
 
-        return self.transport_layer.make_query(
+        return await self.transport_layer.make_query(
             destination,
             query_type,
             args,
@@ -151,42 +153,52 @@ class FederationClient(FederationBase):
         )
 
     @log_function
-    def query_client_keys(self, destination, content, timeout):
+    async def query_client_keys(
+        self, destination: str, content: JsonDict, timeout: int
+    ) -> JsonDict:
         """Query device keys for a device hosted on a remote server.
 
         Args:
-            destination (str): Domain name of the remote homeserver
-            content (dict): The query content.
+            destination: Domain name of the remote homeserver
+            content: The query content.
 
         Returns:
-            an Awaitable which will eventually yield a JSON object from the
-            response
+            The JSON object from the response
         """
         sent_queries_counter.labels("client_device_keys").inc()
-        return self.transport_layer.query_client_keys(destination, content, timeout)
+        return await self.transport_layer.query_client_keys(
+            destination, content, timeout
+        )
 
     @log_function
-    def query_user_devices(self, destination, user_id, timeout=30000):
+    async def query_user_devices(
+        self, destination: str, user_id: str, timeout: int = 30000
+    ) -> JsonDict:
         """Query the device keys for a list of user ids hosted on a remote
         server.
         """
         sent_queries_counter.labels("user_devices").inc()
-        return self.transport_layer.query_user_devices(destination, user_id, timeout)
+        return await self.transport_layer.query_user_devices(
+            destination, user_id, timeout
+        )
 
     @log_function
-    def claim_client_keys(self, destination, content, timeout):
+    async def claim_client_keys(
+        self, destination: str, content: JsonDict, timeout: int
+    ) -> JsonDict:
         """Claims one-time keys for a device hosted on a remote server.
 
         Args:
-            destination (str): Domain name of the remote homeserver
-            content (dict): The query content.
+            destination: Domain name of the remote homeserver
+            content: The query content.
 
         Returns:
-            an Awaitable which will eventually yield a JSON object from the
-            response
+            The JSON object from the response
         """
         sent_queries_counter.labels("client_one_time_keys").inc()
-        return self.transport_layer.claim_client_keys(destination, content, timeout)
+        return await self.transport_layer.claim_client_keys(
+            destination, content, timeout
+        )
 
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
@@ -195,10 +207,10 @@ class FederationClient(FederationBase):
         given destination server.
 
         Args:
-            dest (str): The remote homeserver to ask.
-            room_id (str): The room_id to backfill.
-            limit (int): The maximum number of events to return.
-            extremities (list): our current backwards extremities, to backfill from
+            dest: The remote homeserver to ask.
+            room_id: The room_id to backfill.
+            limit: The maximum number of events to return.
+            extremities: our current backwards extremities, to backfill from
         """
         logger.debug("backfill extrem=%s", extremities)
 
@@ -370,7 +382,7 @@ class FederationClient(FederationBase):
                 for events that have failed their checks
 
         Returns:
-            Deferred : A list of PDUs that have valid signatures and hashes.
+            A list of PDUs that have valid signatures and hashes.
         """
         deferreds = self._check_sigs_and_hashes(room_version, pdus)
 
@@ -418,7 +430,9 @@ class FederationClient(FederationBase):
         else:
             return [p for p in valid_pdus if p]
 
-    async def get_event_auth(self, destination, room_id, event_id):
+    async def get_event_auth(
+        self, destination: str, room_id: str, event_id: str
+    ) -> List[EventBase]:
         res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
 
         room_version = await self.store.get_room_version(room_id)
@@ -700,18 +714,16 @@ class FederationClient(FederationBase):
 
         return await self._try_destination_list("send_join", destinations, send_request)
 
-    async def _do_send_join(self, destination: str, pdu: EventBase):
+    async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict:
         time_now = self._clock.time_msec()
 
         try:
-            content = await self.transport_layer.send_join_v2(
+            return await self.transport_layer.send_join_v2(
                 destination=destination,
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
                 content=pdu.get_pdu_json(time_now),
             )
-
-            return content
         except HttpResponseException as e:
             if e.code in [400, 404]:
                 err = e.to_synapse_error()
@@ -769,7 +781,7 @@ class FederationClient(FederationBase):
         time_now = self._clock.time_msec()
 
         try:
-            content = await self.transport_layer.send_invite_v2(
+            return await self.transport_layer.send_invite_v2(
                 destination=destination,
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
@@ -779,7 +791,6 @@ class FederationClient(FederationBase):
                     "invite_room_state": pdu.unsigned.get("invite_room_state", []),
                 },
             )
-            return content
         except HttpResponseException as e:
             if e.code in [400, 404]:
                 err = e.to_synapse_error()
@@ -842,18 +853,16 @@ class FederationClient(FederationBase):
             "send_leave", destinations, send_request
         )
 
-    async def _do_send_leave(self, destination, pdu):
+    async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
         time_now = self._clock.time_msec()
 
         try:
-            content = await self.transport_layer.send_leave_v2(
+            return await self.transport_layer.send_leave_v2(
                 destination=destination,
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
                 content=pdu.get_pdu_json(time_now),
             )
-
-            return content
         except HttpResponseException as e:
             if e.code in [400, 404]:
                 err = e.to_synapse_error()
@@ -879,7 +888,7 @@ class FederationClient(FederationBase):
         # content.
         return resp[1]
 
-    def get_public_rooms(
+    async def get_public_rooms(
         self,
         remote_server: str,
         limit: Optional[int] = None,
@@ -887,7 +896,7 @@ class FederationClient(FederationBase):
         search_filter: Optional[Dict] = None,
         include_all_networks: bool = False,
         third_party_instance_id: Optional[str] = None,
-    ):
+    ) -> JsonDict:
         """Get the list of public rooms from a remote homeserver
 
         Args:
@@ -901,8 +910,7 @@ class FederationClient(FederationBase):
                 party instance
 
         Returns:
-            Awaitable[Dict[str, Any]]: The response from the remote server, or None if
-            `remote_server` is the same as the local server_name
+            The response from the remote server.
 
         Raises:
             HttpResponseException: There was an exception returned from the remote server
@@ -910,7 +918,7 @@ class FederationClient(FederationBase):
                 requests over federation
 
         """
-        return self.transport_layer.get_public_rooms(
+        return await self.transport_layer.get_public_rooms(
             remote_server,
             limit,
             since_token,
@@ -923,7 +931,7 @@ class FederationClient(FederationBase):
         self,
         destination: str,
         room_id: str,
-        earliest_events_ids: Sequence[str],
+        earliest_events_ids: Iterable[str],
         latest_events: Iterable[EventBase],
         limit: int,
         min_depth: int,
@@ -974,7 +982,9 @@ class FederationClient(FederationBase):
 
         return signed_events
 
-    async def forward_third_party_invite(self, destinations, room_id, event_dict):
+    async def forward_third_party_invite(
+        self, destinations: Iterable[str], room_id: str, event_dict: JsonDict
+    ) -> None:
         for destination in destinations:
             if destination == self.server_name:
                 continue
@@ -983,7 +993,7 @@ class FederationClient(FederationBase):
                 await self.transport_layer.exchange_third_party_invite(
                     destination=destination, room_id=room_id, event_dict=event_dict
                 )
-                return None
+                return
             except CodeMessageException:
                 raise
             except Exception as e:
@@ -995,7 +1005,7 @@ class FederationClient(FederationBase):
 
     async def get_room_complexity(
         self, destination: str, room_id: str
-    ) -> Optional[dict]:
+    ) -> Optional[JsonDict]:
         """
         Fetch the complexity of a remote room from another server.
 
@@ -1008,10 +1018,9 @@ class FederationClient(FederationBase):
             could not fetch the complexity.
         """
         try:
-            complexity = await self.transport_layer.get_room_complexity(
+            return await self.transport_layer.get_room_complexity(
                 destination=destination, room_id=room_id
             )
-            return complexity
         except CodeMessageException as e:
             # We didn't manage to get it -- probably a 404. We are okay if other
             # servers don't give it to us.
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index f39e3d6d5c..86198bab30 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -83,17 +83,32 @@ class UsersRestServletV2(RestServlet):
     The parameter `deactivated` can be used to include deactivated users.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
         self.admin_handler = hs.get_admin_handler()
 
-    async def on_GET(self, request):
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
 
         start = parse_integer(request, "from", default=0)
         limit = parse_integer(request, "limit", default=100)
+
+        if start < 0:
+            raise SynapseError(
+                400,
+                "Query parameter from must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        if limit < 0:
+            raise SynapseError(
+                400,
+                "Query parameter limit must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
         user_id = parse_string(request, "user_id", default=None)
         name = parse_string(request, "name", default=None)
         guests = parse_boolean(request, "guests", default=True)
@@ -103,7 +118,7 @@ class UsersRestServletV2(RestServlet):
             start, limit, user_id, name, guests, deactivated
         )
         ret = {"users": users, "total": total}
-        if len(users) >= limit:
+        if (start + limit) < total:
             ret["next_token"] = str(start + len(users))
 
         return 200, ret
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a19d65ad23..d2ba4bd2fc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -262,13 +262,18 @@ class LoggingTransaction:
         return self.txn.description
 
     def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
+        """Similar to `executemany`, except `txn.rowcount` will not be correct
+        afterwards.
+
+        More efficient than `executemany` on PostgreSQL
+        """
+
         if isinstance(self.database_engine, PostgresEngine):
             from psycopg2.extras import execute_batch  # type: ignore
 
             self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
         else:
-            for val in args:
-                self.execute(sql, val)
+            self.executemany(sql, args)
 
     def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
         """Corresponds to psycopg2.extras.execute_values. Only available when
@@ -888,7 +893,7 @@ class DatabasePool:
             ", ".join("?" for _ in keys[0]),
         )
 
-        txn.executemany(sql, vals)
+        txn.execute_batch(sql, vals)
 
     async def simple_upsert(
         self,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 9097677648..659d8f245f 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -897,7 +897,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 DELETE FROM device_lists_outbound_last_success
                 WHERE destination = ? AND user_id = ?
             """
-            txn.executemany(sql, ((row[0], row[1]) for row in rows))
+            txn.execute_batch(sql, ((row[0], row[1]) for row in rows))
 
             logger.info("Pruned %d device list outbound pokes", count)
 
@@ -1343,7 +1343,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         # Delete older entries in the table, as we really only care about
         # when the latest change happened.
-        txn.executemany(
+        txn.execute_batch(
             """
             DELETE FROM device_lists_stream
             WHERE user_id = ? AND device_id = ? AND stream_id < ?
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 1b657191a9..438383abe1 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -487,7 +487,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 VALUES (?, ?, ?, ?, ?, ?)
             """
 
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     _gen_entry(user_id, actions)
@@ -803,7 +803,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             ],
         )
 
-        txn.executemany(
+        txn.execute_batch(
             """
                 UPDATE event_push_summary
                 SET notif_count = ?, unread_count = ?, stream_ordering = ?
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 3216b3f3c8..5db7d7aaa8 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -876,7 +876,7 @@ class PersistEventsStore:
                         WHERE room_id = ? AND type = ? AND state_key = ?
                     )
                 """
-                txn.executemany(
+                txn.execute_batch(
                     sql,
                     (
                         (
@@ -895,7 +895,7 @@ class PersistEventsStore:
                 )
                 # Now we actually update the current_state_events table
 
-                txn.executemany(
+                txn.execute_batch(
                     "DELETE FROM current_state_events"
                     " WHERE room_id = ? AND type = ? AND state_key = ?",
                     (
@@ -907,7 +907,7 @@ class PersistEventsStore:
                 # We include the membership in the current state table, hence we do
                 # a lookup when we insert. This assumes that all events have already
                 # been inserted into room_memberships.
-                txn.executemany(
+                txn.execute_batch(
                     """INSERT INTO current_state_events
                         (room_id, type, state_key, event_id, membership)
                     VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@@ -927,7 +927,7 @@ class PersistEventsStore:
             # we have no record of the fact the user *was* a member of the
             # room but got, say, state reset out of it.
             if to_delete or to_insert:
-                txn.executemany(
+                txn.execute_batch(
                     "DELETE FROM local_current_membership"
                     " WHERE room_id = ? AND user_id = ?",
                     (
@@ -938,7 +938,7 @@ class PersistEventsStore:
                 )
 
             if to_insert:
-                txn.executemany(
+                txn.execute_batch(
                     """INSERT INTO local_current_membership
                         (room_id, user_id, event_id, membership)
                     VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@@ -1738,7 +1738,7 @@ class PersistEventsStore:
         """
 
         if events_and_contexts:
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     (
@@ -1767,7 +1767,7 @@ class PersistEventsStore:
 
         # Now we delete the staging area for *all* events that were being
         # persisted.
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM event_push_actions_staging WHERE event_id = ?",
             ((event.event_id,) for event, _ in all_events_and_contexts),
         )
@@ -1886,7 +1886,7 @@ class PersistEventsStore:
             " )"
         )
 
-        txn.executemany(
+        txn.execute_batch(
             query,
             [
                 (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
@@ -1900,7 +1900,7 @@ class PersistEventsStore:
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
         )
-        txn.executemany(
+        txn.execute_batch(
             query,
             [
                 (ev.event_id, ev.room_id)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index e46e44ba54..5ca4fa6817 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -139,8 +139,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
 
-        INSERT_CLUMP_SIZE = 1000
-
         def reindex_txn(txn):
             sql = (
                 "SELECT stream_ordering, event_id, json FROM events"
@@ -178,9 +176,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
 
-            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
-                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
+            txn.execute_batch(sql, update_rows)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
@@ -210,8 +206,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
 
-        INSERT_CLUMP_SIZE = 1000
-
         def reindex_search_txn(txn):
             sql = (
                 "SELECT stream_ordering, event_id FROM events"
@@ -256,9 +250,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
 
-            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
-                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
+            txn.execute_batch(sql, rows_to_update)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 283c8a5e22..e017177655 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -417,7 +417,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 " WHERE media_origin = ? AND media_id = ?"
             )
 
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     (time_ms, media_origin, media_id)
@@ -430,7 +430,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 " WHERE media_id = ?"
             )
 
-            txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
+            txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))
 
         return await self.db_pool.runInteraction(
             "update_cached_last_access_time", update_cache_txn
@@ -557,7 +557,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
         return await self.db_pool.runInteraction(
             "delete_url_cache", _delete_url_cache_txn
@@ -586,11 +586,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         def _delete_url_cache_media_txn(txn):
             sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
             sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
         return await self.db_pool.runInteraction(
             "delete_url_cache_media", _delete_url_cache_media_txn
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 5d668aadb2..ecfc9f20b1 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -172,7 +172,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         )
 
         # Update backward extremeties
-        txn.executemany(
+        txn.execute_batch(
             "INSERT INTO event_backward_extremities (room_id, event_id)"
             " VALUES (?, ?)",
             [(room_id, event_id) for event_id, in new_backwards_extrems],
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 8d05288ed4..585b4049d6 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1104,7 +1104,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
                 FROM user_threepids
             """
 
-            txn.executemany(sql, [(id_server,) for id_server in id_servers])
+            txn.execute_batch(sql, [(id_server,) for id_server in id_servers])
 
         if id_servers:
             await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index dcdaf09682..92382bed28 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -873,8 +873,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
             "max_stream_id_exclusive", self._stream_order_on_start + 1
         )
 
-        INSERT_CLUMP_SIZE = 1000
-
         def add_membership_profile_txn(txn):
             sql = """
                 SELECT stream_ordering, event_id, events.room_id, event_json.json
@@ -915,9 +913,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
                 UPDATE room_memberships SET display_name = ?, avatar_url = ?
                 WHERE event_id = ? AND room_id = ?
             """
-            for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
-                clump = to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(to_update_sql, clump)
+            txn.execute_batch(to_update_sql, to_update)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
index f35c70b699..9e8f35c1d2 100644
--- a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
+++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
@@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
         # { "ignored_users": "@someone:example.org": {} }
         ignored_users = content.get("ignored_users", {})
         if isinstance(ignored_users, dict) and ignored_users:
-            cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
+            cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])
 
     # Add indexes after inserting data for efficiency.
     logger.info("Adding constraints to ignored_users table")
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index e34fce6281..871af64b11 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -63,7 +63,7 @@ class SearchWorkerStore(SQLBaseStore):
                 for entry in entries
             )
 
-            txn.executemany(sql, args)
+            txn.execute_batch(sql, args)
 
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (
@@ -75,7 +75,7 @@ class SearchWorkerStore(SQLBaseStore):
                 for entry in entries
             )
 
-            txn.executemany(sql, args)
+            txn.execute_batch(sql, args)
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 0e31cc811a..89cdc84a9c 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -565,11 +565,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             )
 
         logger.info("[purge] removing redundant state groups")
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             ((sg,) for sg in state_groups_to_delete),
         )
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM state_groups WHERE id = ?",
             ((sg,) for sg in state_groups_to_delete),
         )