diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 62497ab63f..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
logger = logging.getLogger(__name__)
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+ "drop_device_list_streams_non_unique_indexes"
+)
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
columns=["user_id", "device_id"],
)
+ # create a unique index on device_lists_remote_cache
+ self.register_background_index_update(
+ "device_lists_remote_cache_unique_idx",
+ index_name="device_lists_remote_cache_unique_id",
+ table="device_lists_remote_cache",
+ columns=["user_id", "device_id"],
+ unique=True,
+ )
+
+ # And one on device_lists_remote_extremeties
+ self.register_background_index_update(
+ "device_lists_remote_extremeties_unique_idx",
+ index_name="device_lists_remote_extremeties_unique_idx",
+ table="device_lists_remote_extremeties",
+ columns=["user_id"],
+ unique=True,
+ )
+
+ # once they complete, we can remove the old non-unique indexes.
+ self.register_background_update_handler(
+ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+ self._drop_device_list_streams_non_unique_indexes,
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
def update_remote_device_list_cache_entry(self, user_id, device_id, content,
stream_id):
- """Updates a single user's device in the cache.
+ """Updates a single device in the cache of a remote user's devicelist.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ device_id (str): ID of decivice being updated
+ content (dict): new data on this device
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"content": json.dumps(content),
- }
+ },
+
+ # we don't need to lock, because we assume we are the only thread
+ # updating this user's devices.
+ lock=False,
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # again, we can assume we are the only thread updating this user's
+ # extremity.
+ lock=False,
)
def update_remote_device_list_cache(self, user_id, devices, stream_id):
- """Replace the cache of the remote user's devices.
+ """Replace the entire cache of the remote user's devices.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ devices (list[dict]): list of device objects supplied over federation
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # we don't need to lock, because we can assume we are the only thread
+ # updating this user's extremity.
+ lock=False,
)
def get_devices_by_remote(self, destination, from_stream_id):
@@ -722,3 +786,19 @@ class DeviceStore(SQLBaseStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
+
+ @defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+ )
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ defer.returnValue(1)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3faca2a042..d3b9dea1d6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore):
"is_state": False,
}
for ev in events
- for e_id, _ in ev.prev_events
+ for e_id in ev.prev_event_ids()
],
)
@@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore):
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
+ for ev in events for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 919e855f3b..2047110b1d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -416,7 +416,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
)
if len_1:
all_single_prev_not_state = all(
- len(event.prev_events) == 1
+ len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
@@ -440,7 +440,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
- prev_event_ids = set(e for e, _ in ev.prev_events)
+ prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
@@ -551,7 +551,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
result.difference_update(
e_id
for event in new_events
- for e_id, _ in event.prev_events
+ for e_id in event.prev_event_ids()
)
# Finally, remove any events which are prev_events of any existing events.
@@ -869,7 +869,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"auth_id": auth_id,
}
for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
+ for auth_id in event.auth_event_ids()
if event.is_state()
],
)
diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql
index 54841b3843..dd6dcb65f1 100644
--- a/synapse/storage/schema/delta/40/device_list_streams.sql
+++ b/synapse/storage/schema/delta/40/device_list_streams.sql
@@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache (
content TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
-
-
-- The last update we got for a user. Empty if we're not receiving updates for
-- that user.
CREATE TABLE device_lists_remote_extremeties (
@@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties (
stream_id TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
+-- we used to create non-unique indexes on these tables, but as of update 52 we create
+-- unique indexes concurrently:
+--
+-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
+-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
-- Stream of device lists updates. Includes both local and remotes
diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
new file mode 100644
index 0000000000..bfa49e6f92
--- /dev/null
+++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will create a unique index on
+-- device_lists_remote_cache
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('device_lists_remote_cache_unique_idx', '{}');
+
+-- and one on device_lists_remote_extremeties
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'device_lists_remote_extremeties_unique_idx', '{}',
+
+ -- doesn't really depend on this, but we need to make sure both happen
+ -- before we drop the old indexes.
+ 'device_lists_remote_cache_unique_idx'
+ );
+
+-- once they complete, we can drop the old indexes.
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'drop_device_list_streams_non_unique_indexes', '{}',
+ 'device_lists_remote_extremeties_unique_idx'
+ );
|