diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 941c07fce5..537696547c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -341,14 +341,11 @@ class SQLBaseStore(object):
expiration_ts,
)
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
"account_validity",
- values={
- "user_id": user_id,
- "expiration_ts_ms": expiration_ts,
- "email_sent": False,
- },
+ keyvalues={"user_id": user_id, },
+ values={"expiration_ts_ms": expiration_ts, "email_sent": False, },
)
def start_profiling(self):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 0b3c656e90..028848cf89 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -152,6 +152,29 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def get_expired_users(self):
+ """Get IDs of all expired users
+
+ Returns:
+ Deferred[list[str]]: List of expired user IDs
+ """
+ def get_expired_users_txn(txn, now_ms):
+ sql = """
+ SELECT user_id from account_validity
+ WHERE expiration_ts_ms <= ?
+ """
+ txn.execute(sql, (now_ms,))
+ rows = txn.fetchall()
+ return [row[0] for row in rows]
+
+ res = yield self.runInteraction(
+ "get_expired_users",
+ get_expired_users_txn,
+ self.clock.time_msec(),
+ )
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
def set_renewal_token_for_user(self, user_id, renewal_token):
"""Defines a renewal token for a given user.
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index c61dfa527f..db3d052d33 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -223,6 +223,69 @@ class RoomWorkerStore(SQLBaseStore):
else:
defer.returnValue(None)
+ @cachedInlineCallbacks()
+ def get_retention_policy_for_room(self, room_id):
+ """Get the retention policy for a given room.
+
+ If no retention policy has been found for this room, returns a policy defined
+ by the configured default policy (which has None as both the 'min_lifetime' and
+ the 'max_lifetime' if no default policy has been defined in the server's
+ configuration).
+
+ Args:
+ room_id (str): The ID of the room to get the retention policy of.
+
+ Returns:
+ dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
+ """
+ # If the room retention feature is disabled, return a policy with no minimum nor
+ # maximum, in order not to filter out events we should filter out when sending to
+ # the client.
+ if not self.config.retention_enabled:
+ defer.returnValue({
+ "min_lifetime": None,
+ "max_lifetime": None,
+ })
+
+ def get_retention_policy_for_room_txn(txn):
+ txn.execute(
+ """
+ SELECT min_lifetime, max_lifetime FROM room_retention
+ INNER JOIN current_state_events USING (event_id, room_id)
+ WHERE room_id = ?;
+ """,
+ (room_id,)
+ )
+
+ return self.cursor_to_dict(txn)
+
+ ret = yield self.runInteraction(
+ "get_retention_policy_for_room",
+ get_retention_policy_for_room_txn,
+ )
+
+ # If we don't know this room ID, ret will be None, in this case return the default
+ # policy.
+ if not ret:
+ defer.returnValue({
+ "min_lifetime": self.config.retention_default_min_lifetime,
+ "max_lifetime": self.config.retention_default_max_lifetime,
+ })
+
+ row = ret[0]
+
+ # If one of the room's policy's attributes isn't defined, use the matching
+ # attribute from the default policy.
+ # The default values will be None if no default policy has been defined, or if one
+ # of the attributes is missing from the default policy.
+ if row["min_lifetime"] is None:
+ row["min_lifetime"] = self.config.retention_default_min_lifetime
+
+ if row["max_lifetime"] is None:
+ row["max_lifetime"] = self.config.retention_default_max_lifetime
+
+ defer.returnValue(row)
+
class RoomStore(RoomWorkerStore, SearchStore):
def __init__(self, db_conn, hs):
@@ -835,58 +898,3 @@ class RoomStore(RoomWorkerStore, SearchStore):
)
defer.returnValue(rooms)
-
- @cachedInlineCallbacks()
- def get_retention_policy_for_room(self, room_id):
- """Get the retention policy for a given room.
-
- If no retention policy has been found for this room, returns a policy defined
- by the configured default policy (which has None as both the 'min_lifetime' and
- the 'max_lifetime' if no default policy has been defined in the server's
- configuration).
-
- Args:
- room_id (str): The ID of the room to get the retention policy of.
-
- Returns:
- dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
- """
-
- def get_retention_policy_for_room_txn(txn):
- txn.execute(
- """
- SELECT min_lifetime, max_lifetime FROM room_retention
- INNER JOIN current_state_events USING (event_id, room_id)
- WHERE room_id = ?;
- """,
- (room_id,)
- )
-
- return self.cursor_to_dict(txn)
-
- ret = yield self.runInteraction(
- "get_retention_policy_for_room",
- get_retention_policy_for_room_txn,
- )
-
- # If we don't know this room ID, ret will be None, in this case return the default
- # policy.
- if not ret:
- defer.returnValue({
- "min_lifetime": self.config.retention_default_min_lifetime,
- "max_lifetime": self.config.retention_default_max_lifetime,
- })
-
- row = ret[0]
-
- # If one of the room's policy's attributes isn't defined, use the matching
- # attribute from the default policy.
- # The default values will be None if no default policy has been defined, or if one
- # of the attributes is missing from the default policy.
- if row["min_lifetime"] is None:
- row["min_lifetime"] = self.config.retention_default_min_lifetime
-
- if row["max_lifetime"] is None:
- row["max_lifetime"] = self.config.retention_default_max_lifetime
-
- defer.returnValue(row)
|