diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 79a58df591..ac5239e50a 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -28,7 +28,12 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import Cache, SQLBaseStore, db_to_json
+from synapse.storage._base import (
+ Cache,
+ SQLBaseStore,
+ db_to_json,
+ make_in_list_sql_clause,
+)
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
@@ -448,11 +453,14 @@ class DeviceWorkerStore(SQLBaseStore):
sql = """
SELECT DISTINCT user_id FROM device_lists_stream
WHERE stream_id > ?
- AND user_id IN (%s)
+ AND
"""
for chunk in batch_iter(to_check, 100):
- txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "user_id", chunk
+ )
+ txn.execute(sql + clause, (from_key,) + tuple(args))
changes.update(user_id for user_id, in txn)
return changes
@@ -512,17 +520,9 @@ class DeviceWorkerStore(SQLBaseStore):
return results
-class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
+class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
- super(DeviceStore, self).__init__(db_conn, hs)
-
- # Map of (user_id, device_id) -> bool. If there is an entry that implies
- # the device exists.
- self.device_id_exists_cache = Cache(
- name="device_id_exists", keylen=2, max_entries=10000
- )
-
- self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
+ super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"device_lists_stream_idx",
@@ -556,6 +556,31 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
)
@defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
+ txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ return 1
+
+
+class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(DeviceStore, self).__init__(db_conn, hs)
+
+ # Map of (user_id, device_id) -> bool. If there is an entry that implies
+ # the device exists.
+ self.device_id_exists_cache = Cache(
+ name="device_id_exists", keylen=2, max_entries=10000
+ )
+
+ self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
def store_device(self, user_id, device_id, initial_device_display_name):
"""Ensure the given device is known; add it to the store if not
@@ -910,15 +935,3 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
-
- @defer.inlineCallbacks
- def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
- def f(conn):
- txn = conn.cursor()
- txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
- txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
- txn.close()
-
- yield self.runWithConnection(f)
- yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
- return 1
|