summary refs log tree commit diff
path: root/synapse/storage/data_stores/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/data_stores/main')
-rw-r--r--synapse/storage/data_stores/main/devices.py64
-rw-r--r--synapse/storage/data_stores/main/events.py34
-rw-r--r--synapse/storage/data_stores/main/events_worker.py8
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py109
-rw-r--r--synapse/storage/data_stores/main/roommember.py23
5 files changed, 126 insertions, 112 deletions
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py

index fe6d6ecfe0..417ac8dc7c 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py
@@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import List, Optional, Set, Tuple from six import iteritems @@ -649,21 +649,31 @@ class DeviceWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]): + def get_user_ids_requiring_device_list_resync( + self, user_ids: Optional[Collection[str]] = None, + ) -> Set[str]: """Given a list of remote users return the list of users that we - should resync the device lists for. + should resync the device lists for. If None is given instead of a list, + return every user that we should resync the device lists for. Returns: - Deferred[Set[str]] + The IDs of users whose device lists need resync. """ - - rows = yield self.db.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", - ) + if user_ids: + rows = yield self.db.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ) + else: + rows = yield self.db.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ) return {row["user_id"] for row in rows} @@ -679,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore): desc="make_remote_user_device_cache_as_stale", ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + """Mark that we no longer track device lists for remote user. + """ + + def _mark_remote_user_device_list_as_unsubscribed_txn(txn): + self.db.simple_delete_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + ) + self._invalidate_cache_and_stream( + txn, self.get_device_list_last_stream_id_for_remote, (user_id,) + ) + + return self.db.runInteraction( + "mark_remote_user_device_list_as_unsubscribed", + _mark_remote_user_device_list_as_unsubscribed_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): @@ -959,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): desc="update_device", ) - @defer.inlineCallbacks - def mark_remote_user_device_list_as_unsubscribed(self, user_id): - """Mark that we no longer track device lists for remote user. - """ - yield self.db.simple_delete( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - desc="mark_remote_user_device_list_as_unsubscribed", - ) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) - def update_remote_device_list_cache_entry( self, user_id, device_id, content, stream_id ): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a97f8b3934..a6572571b4 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py
@@ -138,10 +138,10 @@ class PersistEventsStore: self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator - # This should only exist on master for now + # This should only exist on instances that are configured to write assert ( - hs.config.worker.worker_app is None - ), "Can only instantiate PersistEventsStore on master" + hs.config.worker.writers.events == hs.get_instance_name() + ), "Can only instantiate EventsStore on master" @_retry_on_integrity_error @defer.inlineCallbacks @@ -1590,3 +1590,31 @@ class PersistEventsStore: if not ev.internal_metadata.is_outlier() ], ) + + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + # We also clear this entry from `local_current_membership`. + # Ideally we'd point to a leave event, but we don't have one, so + # nevermind. + self.db.simple_delete_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": room_id, "user_id": user_id}, + ) + + with self._stream_id_gen.get_next() as stream_ordering: + await self.db.runInteraction("locally_reject_invite", f, stream_ordering) + + return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 9130b74eb5..213d69100a 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py
@@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if hs.config.worker_app is None: + if hs.config.worker.writers.events == hs.get_instance_name(): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( @@ -1289,12 +1289,12 @@ class EventsWorkerStore(SQLBaseStore): async def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream """ - to_1, so_1 = await self._get_event_ordering(event_id1) - to_2, so_2 = await self._get_event_ordering(event_id2) + to_1, so_1 = await self.get_event_ordering(event_id1) + to_2, so_2 = await self.get_event_ordering(event_id2) return (to_1, so_1) > (to_2, so_2) @cachedInlineCallbacks(max_entries=5000) - def _get_event_ordering(self, event_id): + def get_event_ordering(self, event_id): res = yield self.db.simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index a624d1f1b6..248ca1359e 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -17,7 +17,7 @@ import logging from twisted.internet import defer from synapse.storage._base import SQLBaseStore -from synapse.storage.database import Database +from synapse.storage.database import Database, make_in_list_sql_clause from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -146,6 +146,15 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): threepids (list[dict]): List of threepid dicts to reserve """ + # XXX what is this function trying to achieve? It upserts into + # monthly_active_users for each *registered* reserved mau user, but why? + # + # - shouldn't there already be an entry for each reserved user (at least + # if they have been active recently)? + # + # - if it's important that the timestamp is kept up to date, why do we only + # run this at startup? + for tp in threepids: user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"]) @@ -178,75 +187,57 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): """ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) - query_args = [thirty_days_ago] - base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?" - - # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres - # when len(reserved_users) == 0. Works fine on sqlite. - if len(reserved_users) > 0: - # questionmarks is a hack to overcome sqlite not supporting - # tuples in 'WHERE IN %s' - question_marks = ",".join("?" * len(reserved_users)) - - query_args.extend(reserved_users) - sql = base_sql + " AND user_id NOT IN ({})".format(question_marks) - else: - sql = base_sql - txn.execute(sql, query_args) + in_clause, in_clause_args = make_in_list_sql_clause( + self.database_engine, "user_id", reserved_users + ) + + txn.execute( + "DELETE FROM monthly_active_users WHERE timestamp < ? AND NOT %s" + % (in_clause,), + [thirty_days_ago] + in_clause_args, + ) if self._limit_usage_by_mau: # If MAU user count still exceeds the MAU threshold, then delete on # a least recently active basis. # Note it is not possible to write this query using OFFSET due to # incompatibilities in how sqlite and postgres support the feature. - # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present - # While Postgres does not require 'LIMIT', but also does not support + # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be presents, + # while Postgres does not require 'LIMIT', but also does not support # negative LIMIT values. So there is no way to write it that both can # support - if len(reserved_users) == 0: - sql = """ - DELETE FROM monthly_active_users - WHERE user_id NOT IN ( - SELECT user_id FROM monthly_active_users - ORDER BY timestamp DESC - LIMIT ? - ) - """ - txn.execute(sql, ((self._max_mau_value),)) - # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres - # when len(reserved_users) == 0. Works fine on sqlite. - else: - # Must be >= 0 for postgres - num_of_non_reserved_users_to_remove = max( - self._max_mau_value - len(reserved_users), 0 - ) - # It is important to filter reserved users twice to guard - # against the case where the reserved user is present in the - # SELECT, meaning that a legitmate mau is deleted. - sql = """ - DELETE FROM monthly_active_users - WHERE user_id NOT IN ( - SELECT user_id FROM monthly_active_users - WHERE user_id NOT IN ({}) - ORDER BY timestamp DESC - LIMIT ? - ) - AND user_id NOT IN ({}) - """.format( - question_marks, question_marks + # Limit must be >= 0 for postgres + num_of_non_reserved_users_to_remove = max( + self._max_mau_value - len(reserved_users), 0 + ) + + # It is important to filter reserved users twice to guard + # against the case where the reserved user is present in the + # SELECT, meaning that a legitimate mau is deleted. + sql = """ + DELETE FROM monthly_active_users + WHERE user_id NOT IN ( + SELECT user_id FROM monthly_active_users + WHERE NOT %s + ORDER BY timestamp DESC + LIMIT ? ) - - query_args = [ - *reserved_users, - num_of_non_reserved_users_to_remove, - *reserved_users, - ] - - txn.execute(sql, query_args) - - # It seems poor to invalidate the whole cache, Postgres supports + AND NOT %s + """ % ( + in_clause, + in_clause, + ) + + query_args = ( + in_clause_args + + [num_of_non_reserved_users_to_remove] + + in_clause_args + ) + txn.execute(sql, query_args) + + # It seems poor to invalidate the whole cache. Postgres supports # 'Returning' which would allow me to invalidate only the # specific users, but sqlite has no way to do this and instead # I would need to SELECT and the DELETE which without locking diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 1e9c850152..137ebac833 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py
@@ -1046,29 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): super(RoomMemberStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.db.runInteraction("locally_reject_invite", f, stream_ordering) - def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id."""