diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fe6d6ecfe0..417ac8dc7c 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import List, Tuple
+from typing import List, Optional, Set, Tuple
from six import iteritems
@@ -649,21 +649,31 @@ class DeviceWorkerStore(SQLBaseStore):
return results
@defer.inlineCallbacks
- def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]):
+ def get_user_ids_requiring_device_list_resync(
+ self, user_ids: Optional[Collection[str]] = None,
+ ) -> Set[str]:
"""Given a list of remote users return the list of users that we
- should resync the device lists for.
+ should resync the device lists for. If None is given instead of a list,
+ return every user that we should resync the device lists for.
Returns:
- Deferred[Set[str]]
+ The IDs of users whose device lists need resync.
"""
-
- rows = yield self.db.simple_select_many_batch(
- table="device_lists_remote_resync",
- column="user_id",
- iterable=user_ids,
- retcols=("user_id",),
- desc="get_user_ids_requiring_device_list_resync",
- )
+ if user_ids:
+ rows = yield self.db.simple_select_many_batch(
+ table="device_lists_remote_resync",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id",),
+ desc="get_user_ids_requiring_device_list_resync_with_iterable",
+ )
+ else:
+ rows = yield self.db.simple_select_list(
+ table="device_lists_remote_resync",
+ keyvalues=None,
+ retcols=("user_id",),
+ desc="get_user_ids_requiring_device_list_resync",
+ )
return {row["user_id"] for row in rows}
@@ -679,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore):
desc="make_remote_user_device_cache_as_stale",
)
+ def mark_remote_user_device_list_as_unsubscribed(self, user_id):
+ """Mark that we no longer track device lists for remote user.
+ """
+
+ def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
+ self.db.simple_delete_txn(
+ txn,
+ table="device_lists_remote_extremeties",
+ keyvalues={"user_id": user_id},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
+ )
+
+ return self.db.runInteraction(
+ "mark_remote_user_device_list_as_unsubscribed",
+ _mark_remote_user_device_list_as_unsubscribed_txn,
+ )
+
class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
@@ -959,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
desc="update_device",
)
- @defer.inlineCallbacks
- def mark_remote_user_device_list_as_unsubscribed(self, user_id):
- """Mark that we no longer track device lists for remote user.
- """
- yield self.db.simple_delete(
- table="device_lists_remote_extremeties",
- keyvalues={"user_id": user_id},
- desc="mark_remote_user_device_list_as_unsubscribed",
- )
- self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
-
def update_remote_device_list_cache_entry(
self, user_id, device_id, content, stream_id
):
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a97f8b3934..a6572571b4 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -138,10 +138,10 @@ class PersistEventsStore:
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
- # This should only exist on master for now
+ # This should only exist on instances that are configured to write
assert (
- hs.config.worker.worker_app is None
- ), "Can only instantiate PersistEventsStore on master"
+ hs.config.worker.writers.events == hs.get_instance_name()
+ ), "Can only instantiate EventsStore on master"
@_retry_on_integrity_error
@defer.inlineCallbacks
@@ -1590,3 +1590,31 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier()
],
)
+
+ async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+ """Mark the invite has having been rejected even though we failed to
+ create a leave event for it.
+ """
+
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ def f(txn, stream_ordering):
+ txn.execute(sql, (stream_ordering, True, room_id, user_id))
+
+ # We also clear this entry from `local_current_membership`.
+ # Ideally we'd point to a leave event, but we don't have one, so
+ # nevermind.
+ self.db.simple_delete_txn(
+ txn,
+ table="local_current_membership",
+ keyvalues={"room_id": room_id, "user_id": user_id},
+ )
+
+ with self._stream_id_gen.get_next() as stream_ordering:
+ await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
+
+ return stream_ordering
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 9130b74eb5..213d69100a 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
- if hs.config.worker_app is None:
+ if hs.config.worker.writers.events == hs.get_instance_name():
# We are the process in charge of generating stream ids for events,
# so instantiate ID generators based on the database
self._stream_id_gen = StreamIdGenerator(
@@ -1289,12 +1289,12 @@ class EventsWorkerStore(SQLBaseStore):
async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
- to_1, so_1 = await self._get_event_ordering(event_id1)
- to_2, so_2 = await self._get_event_ordering(event_id2)
+ to_1, so_1 = await self.get_event_ordering(event_id1)
+ to_2, so_2 = await self.get_event_ordering(event_id2)
return (to_1, so_1) > (to_2, so_2)
@cachedInlineCallbacks(max_entries=5000)
- def _get_event_ordering(self, event_id):
+ def get_event_ordering(self, event_id):
res = yield self.db.simple_select_one(
table="events",
retcols=["topological_ordering", "stream_ordering"],
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index a624d1f1b6..248ca1359e 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import Database
+from synapse.storage.database import Database, make_in_list_sql_clause
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -146,6 +146,15 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
threepids (list[dict]): List of threepid dicts to reserve
"""
+ # XXX what is this function trying to achieve? It upserts into
+ # monthly_active_users for each *registered* reserved mau user, but why?
+ #
+ # - shouldn't there already be an entry for each reserved user (at least
+ # if they have been active recently)?
+ #
+ # - if it's important that the timestamp is kept up to date, why do we only
+ # run this at startup?
+
for tp in threepids:
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
@@ -178,75 +187,57 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
"""
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
- query_args = [thirty_days_ago]
- base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
-
- # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
- # when len(reserved_users) == 0. Works fine on sqlite.
- if len(reserved_users) > 0:
- # questionmarks is a hack to overcome sqlite not supporting
- # tuples in 'WHERE IN %s'
- question_marks = ",".join("?" * len(reserved_users))
-
- query_args.extend(reserved_users)
- sql = base_sql + " AND user_id NOT IN ({})".format(question_marks)
- else:
- sql = base_sql
- txn.execute(sql, query_args)
+ in_clause, in_clause_args = make_in_list_sql_clause(
+ self.database_engine, "user_id", reserved_users
+ )
+
+ txn.execute(
+ "DELETE FROM monthly_active_users WHERE timestamp < ? AND NOT %s"
+ % (in_clause,),
+ [thirty_days_ago] + in_clause_args,
+ )
if self._limit_usage_by_mau:
# If MAU user count still exceeds the MAU threshold, then delete on
# a least recently active basis.
# Note it is not possible to write this query using OFFSET due to
# incompatibilities in how sqlite and postgres support the feature.
- # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
- # While Postgres does not require 'LIMIT', but also does not support
+ # Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be presents,
+ # while Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can
# support
- if len(reserved_users) == 0:
- sql = """
- DELETE FROM monthly_active_users
- WHERE user_id NOT IN (
- SELECT user_id FROM monthly_active_users
- ORDER BY timestamp DESC
- LIMIT ?
- )
- """
- txn.execute(sql, ((self._max_mau_value),))
- # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
- # when len(reserved_users) == 0. Works fine on sqlite.
- else:
- # Must be >= 0 for postgres
- num_of_non_reserved_users_to_remove = max(
- self._max_mau_value - len(reserved_users), 0
- )
- # It is important to filter reserved users twice to guard
- # against the case where the reserved user is present in the
- # SELECT, meaning that a legitmate mau is deleted.
- sql = """
- DELETE FROM monthly_active_users
- WHERE user_id NOT IN (
- SELECT user_id FROM monthly_active_users
- WHERE user_id NOT IN ({})
- ORDER BY timestamp DESC
- LIMIT ?
- )
- AND user_id NOT IN ({})
- """.format(
- question_marks, question_marks
+ # Limit must be >= 0 for postgres
+ num_of_non_reserved_users_to_remove = max(
+ self._max_mau_value - len(reserved_users), 0
+ )
+
+ # It is important to filter reserved users twice to guard
+ # against the case where the reserved user is present in the
+ # SELECT, meaning that a legitimate mau is deleted.
+ sql = """
+ DELETE FROM monthly_active_users
+ WHERE user_id NOT IN (
+ SELECT user_id FROM monthly_active_users
+ WHERE NOT %s
+ ORDER BY timestamp DESC
+ LIMIT ?
)
-
- query_args = [
- *reserved_users,
- num_of_non_reserved_users_to_remove,
- *reserved_users,
- ]
-
- txn.execute(sql, query_args)
-
- # It seems poor to invalidate the whole cache, Postgres supports
+ AND NOT %s
+ """ % (
+ in_clause,
+ in_clause,
+ )
+
+ query_args = (
+ in_clause_args
+ + [num_of_non_reserved_users_to_remove]
+ + in_clause_args
+ )
+ txn.execute(sql, query_args)
+
+ # It seems poor to invalidate the whole cache. Postgres supports
# 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead
# I would need to SELECT and the DELETE which without locking
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 1e9c850152..137ebac833 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -1046,29 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
super(RoomMemberStore, self).__init__(database, db_conn, hs)
- @defer.inlineCallbacks
- def locally_reject_invite(self, user_id, room_id):
- sql = (
- "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
- " room_id = ? AND invitee = ? AND locally_rejected is NULL"
- " AND replaced_by is NULL"
- )
-
- def f(txn, stream_ordering):
- txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
- # We also clear this entry from `local_current_membership`.
- # Ideally we'd point to a leave event, but we don't have one, so
- # nevermind.
- self.db.simple_delete_txn(
- txn,
- table="local_current_membership",
- keyvalues={"room_id": room_id, "user_id": user_id},
- )
-
- with self._stream_id_gen.get_next() as stream_ordering:
- yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
|