diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a85633efcd..0836e247ef 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -277,6 +277,10 @@ class DataStore(
FROM users as u
LEFT JOIN profiles AS p ON u.name = p.full_user_id
LEFT JOIN erased_users AS eu ON u.name = eu.user_id
+ LEFT JOIN (
+ SELECT user_id, MAX(last_seen) AS last_seen_ts
+ FROM user_ips GROUP BY user_id
+ ) ls ON u.name = ls.user_id
{where_clause}
"""
sql = "SELECT COUNT(*) as total_users " + sql_base
@@ -286,7 +290,7 @@ class DataStore(
sql = f"""
SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
displayname, avatar_url, creation_ts * 1000 as creation_ts, approved,
- eu.user_id is not null as erased
+ eu.user_id is not null as erased, last_seen_ts
{sql_base}
ORDER BY {order_by_column} {order}, u.name ASC
LIMIT ? OFFSET ?
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c1353b18c1..0c1ed75240 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -978,26 +978,12 @@ class PersistEventsStore:
"""Persist the mapping from transaction IDs to event IDs (if defined)."""
inserted_ts = self._clock.time_msec()
- to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = []
to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = []
for event, _ in events_and_contexts:
txn_id = getattr(event.internal_metadata, "txn_id", None)
- token_id = getattr(event.internal_metadata, "token_id", None)
device_id = getattr(event.internal_metadata, "device_id", None)
if txn_id is not None:
- if token_id is not None:
- to_insert_token_id.append(
- (
- event.event_id,
- event.room_id,
- event.sender,
- token_id,
- txn_id,
- inserted_ts,
- )
- )
-
if device_id is not None:
to_insert_device_id.append(
(
@@ -1010,26 +996,7 @@ class PersistEventsStore:
)
)
- # Synapse usually relies on the device_id to scope transactions for events,
- # except for users without device IDs (appservice, guests, and access
- # tokens minted with the admin API) which use the access token ID instead.
- #
- # TODO https://github.com/matrix-org/synapse/issues/16042
- if to_insert_token_id:
- self.db_pool.simple_insert_many_txn(
- txn,
- table="event_txn_id",
- keys=(
- "event_id",
- "room_id",
- "user_id",
- "token_id",
- "txn_id",
- "inserted_ts",
- ),
- values=to_insert_token_id,
- )
-
+ # Synapse relies on the device_id to scope transactions for events..
if to_insert_device_id:
self.db_pool.simple_insert_many_txn(
txn,
@@ -1671,7 +1638,7 @@ class PersistEventsStore:
if self._ephemeral_messages_enabled:
# If there's an expiry timestamp on the event, store it.
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
- if type(expiry_ts) is int and not event.is_state():
+ if type(expiry_ts) is int and not event.is_state(): # noqa: E721
self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)
# Insert into the room_memberships table.
@@ -2039,10 +2006,10 @@ class PersistEventsStore:
):
if (
"min_lifetime" in event.content
- and type(event.content["min_lifetime"]) is not int
+ and type(event.content["min_lifetime"]) is not int # noqa: E721
) or (
"max_lifetime" in event.content
- and type(event.content["max_lifetime"]) is not int
+ and type(event.content["max_lifetime"]) is not int # noqa: E721
):
# Ignore the event if one of the value isn't an integer.
return
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d0dd455aec..943666ed4f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2022,25 +2022,6 @@ class EventsWorkerStore(SQLBaseStore):
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)
- async def get_event_id_from_transaction_id_and_token_id(
- self, room_id: str, user_id: str, token_id: int, txn_id: str
- ) -> Optional[str]:
- """Look up if we have already persisted an event for the transaction ID,
- returning the event ID if so.
- """
- return await self.db_pool.simple_select_one_onecol(
- table="event_txn_id",
- keyvalues={
- "room_id": room_id,
- "user_id": user_id,
- "token_id": token_id,
- "txn_id": txn_id,
- },
- retcol="event_id",
- allow_none=True,
- desc="get_event_id_from_transaction_id_and_token_id",
- )
-
async def get_event_id_from_transaction_id_and_device_id(
self, room_id: str, user_id: str, device_id: str, txn_id: str
) -> Optional[str]:
@@ -2072,29 +2053,35 @@ class EventsWorkerStore(SQLBaseStore):
"""
mapping = {}
- txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
+ txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {}
for event in events:
- token_id = getattr(event.internal_metadata, "token_id", None)
+ device_id = getattr(event.internal_metadata, "device_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
- if token_id and txn_id:
+ if device_id and txn_id:
# Check if this is a duplicate of an event in the given events.
- existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
+ existing = txn_id_to_event.get(
+ (event.room_id, event.sender, device_id, txn_id)
+ )
if existing:
mapping[event.event_id] = existing
continue
# Check if this is a duplicate of an event we've already
# persisted.
- existing = await self.get_event_id_from_transaction_id_and_token_id(
- event.room_id, event.sender, token_id, txn_id
+ existing = await self.get_event_id_from_transaction_id_and_device_id(
+ event.room_id, event.sender, device_id, txn_id
)
if existing:
mapping[event.event_id] = existing
- txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
+ txn_id_to_event[
+ (event.room_id, event.sender, device_id, txn_id)
+ ] = existing
else:
- txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
+ txn_id_to_event[
+ (event.room_id, event.sender, device_id, txn_id)
+ ] = event.event_id
return mapping
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 54d40e7a3a..5a01ec2137 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -17,7 +17,7 @@ from types import TracebackType
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary
-from twisted.internet.interfaces import IReactorCore
+from twisted.internet.task import LoopingCall
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -26,6 +26,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -358,7 +359,7 @@ class Lock:
def __init__(
self,
- reactor: IReactorCore,
+ reactor: ISynapseReactor,
clock: Clock,
store: LockStore,
read_write: bool,
@@ -377,19 +378,25 @@ class Lock:
self._table = "worker_read_write_locks" if read_write else "worker_locks"
- self._looping_call = clock.looping_call(
+ # We might be called from a non-main thread, so we defer setting up the
+ # looping call.
+ self._looping_call: Optional[LoopingCall] = None
+ reactor.callFromThread(self._setup_looping_call)
+
+ self._dropped = False
+
+ def _setup_looping_call(self) -> None:
+ self._looping_call = self._clock.looping_call(
self._renew,
_RENEWAL_INTERVAL_MS,
- store,
- clock,
- read_write,
- lock_name,
- lock_key,
- token,
+ self._store,
+ self._clock,
+ self._read_write,
+ self._lock_name,
+ self._lock_key,
+ self._token,
)
- self._dropped = False
-
@staticmethod
@wrap_as_background_process("Lock._renew")
async def _renew(
@@ -459,7 +466,7 @@ class Lock:
if self._dropped:
return
- if self._looping_call.running:
+ if self._looping_call and self._looping_call.running:
self._looping_call.stop()
await self._store.db_pool.simple_delete(
@@ -486,8 +493,9 @@ class Lock:
# We should not be dropped without the lock being released (unless
# we're shutting down), but if we are then let's at least stop
# renewing the lock.
- if self._looping_call.running:
- self._looping_call.stop()
+ if self._looping_call and self._looping_call.running:
+ # We might be called from a non-main thread.
+ self._reactor.callFromThread(self._looping_call.stop)
if self._reactor.running:
logger.error(
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index c13c0bc7d7..bec0dc2afe 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -88,7 +88,6 @@ def _load_rules(
msc1767_enabled=experimental_config.msc1767_enabled,
msc3664_enabled=experimental_config.msc3664_enabled,
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
- msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs,
)
return filtered_rules
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index d3a01d526f..7e85b73e8e 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -206,8 +206,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
consent_server_notice_sent, appservice_id, creation_ts, user_type,
deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned,
COALESCE(approved, TRUE) AS approved,
- COALESCE(locked, FALSE) AS locked
+ COALESCE(locked, FALSE) AS locked, last_seen_ts
FROM users
+ LEFT JOIN (
+ SELECT user_id, MAX(last_seen) AS last_seen_ts
+ FROM user_ips GROUP BY user_id
+ ) ls ON users.name = ls.user_id
WHERE name = ?
""",
(user_id,),
@@ -268,6 +272,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
is_shadow_banned=bool(user_data["shadow_banned"]),
user_id=UserID.from_string(user_data["name"]),
user_type=user_data["user_type"],
+ last_seen_ts=user_data["last_seen_ts"],
)
async def is_trial_user(self, user_id: str) -> bool:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 6298f0984d..3a2966b9e4 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -107,6 +107,7 @@ class UserSortOrder(Enum):
AVATAR_URL = "avatar_url"
SHADOW_BANNED = "shadow_banned"
CREATION_TS = "creation_ts"
+ LAST_SEEN_TS = "last_seen_ts"
class StatsStore(StateDeltasStore):
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 860bbf7c0f..efd21b5bfc 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
import logging
from enum import Enum
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast
import attr
from canonicaljson import encode_canonical_json
@@ -28,8 +28,8 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
-from synapse.types import JsonDict
-from synapse.util.caches.descriptors import cached
+from synapse.types import JsonDict, StrCollection
+from synapse.util.caches.descriptors import cached, cachedList
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
else:
return None
+ @cachedList(
+ cached_method_name="get_destination_retry_timings", list_name="destinations"
+ )
+ async def get_destination_retry_timings_batch(
+ self, destinations: StrCollection
+ ) -> Dict[str, Optional[DestinationRetryTimings]]:
+ rows = await self.db_pool.simple_select_many_batch(
+ table="destinations",
+ iterable=destinations,
+ column="destination",
+ retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
+ desc="get_destination_retry_timings_batch",
+ )
+
+ return {
+ row.pop("destination"): DestinationRetryTimings(**row)
+ for row in rows
+ if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
+ }
+
async def set_destination_retry_timings(
self,
destination: str,
|