diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 4ea56331c7..4a57af1a0e 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -465,18 +465,15 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
- # a lesser last_seen time. (We COALESCE so that the sub-SELECT always
- # returns exactly one row).
+ # a lesser last_seen time. (We use an `IN` clause to force postgres to
+ # use the index, otherwise it tends to do a seq scan).
sql = """
DELETE FROM user_ips
- WHERE last_seen <= (
- SELECT COALESCE(MAX(last_seen), -1)
- FROM (
- SELECT last_seen FROM user_ips
- WHERE last_seen <= ?
- ORDER BY last_seen ASC
- LIMIT 5000
- ) AS u
+ WHERE last_seen IN (
+ SELECT last_seen FROM user_ips
+ WHERE last_seen <= ?
+ ORDER BY last_seen ASC
+ LIMIT 5000
)
"""
@@ -589,6 +586,27 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: Optional[str],
now: Optional[int] = None,
) -> None:
+ """Record that `user_id` used `access_token` from this `ip` address.
+
+ This method does two things.
+
+ 1. It queues up a row to be upserted into the `client_ips` table. These happen
+ periodically; see _update_client_ips_batch.
+ 2. It immediately records this user as having taken action for the purposes of
+ MAU tracking.
+
+ Any DB writes take place on the background tasks worker, falling back to the
+ main process. If we're not that worker, this method emits a replication payload
+ to run this logic on that worker.
+
+ Two caveats to note:
+
+ - We only take action once per LAST_SEEN_GRANULARITY, to avoid spamming the
+ DB with writes.
+ - Requests using the sliding-sync proxy's user agent are excluded, as its
+ requests are not directly driven by end-users. This is a hack and we're not
+ very proud of it.
+ """
# The sync proxy continuously triggers /sync even if the user is not
# present so should be excluded from user_ips entries.
if user_agent == "sync-v3-proxy-":
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 659631ab65..d0a2028045 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -87,25 +87,32 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self._instance_name in hs.config.worker.writers.to_device
)
- self._device_inbox_id_gen: AbstractStreamIdGenerator = (
+ self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
- tables=[("device_inbox", "instance_name", "stream_id")],
+ tables=[
+ ("device_inbox", "instance_name", "stream_id"),
+ ("device_federation_outbox", "instance_name", "stream_id"),
+ ],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)
)
else:
self._can_write_to_device = True
- self._device_inbox_id_gen = StreamIdGenerator(
- db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
+ self._to_device_msg_id_gen = StreamIdGenerator(
+ db_conn,
+ hs.get_replication_notifier(),
+ "device_inbox",
+ "stream_id",
+ extra_tables=[("device_federation_outbox", "stream_id")],
)
- max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+ max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
@@ -145,8 +152,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
) -> None:
if stream_name == ToDeviceStream.NAME:
# If replication is happening than postgres must be being used.
- assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
- self._device_inbox_id_gen.advance(instance_name, token)
+ assert isinstance(self._to_device_msg_id_gen, MultiWriterIdGenerator)
+ self._to_device_msg_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
@@ -162,11 +169,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == ToDeviceStream.NAME:
- self._device_inbox_id_gen.advance(instance_name, token)
+ self._to_device_msg_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)
def get_to_device_stream_token(self) -> int:
- return self._device_inbox_id_gen.get_current_token()
+ return self._to_device_msg_id_gen.get_current_token()
async def get_messages_for_user_devices(
self,
@@ -801,7 +808,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
msg.get(EventContentFields.TO_DEVICE_MSGID),
)
- async with self._device_inbox_id_gen.get_next() as stream_id:
+ async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
@@ -813,7 +820,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
destination, stream_id
)
- return self._device_inbox_id_gen.get_current_token()
+ return self._to_device_msg_id_gen.get_current_token()
async def add_messages_from_remote_to_device_inbox(
self,
@@ -857,7 +864,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn, stream_id, local_messages_by_user_then_device
)
- async with self._device_inbox_id_gen.get_next() as stream_id:
+ async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 7e992ca4a2..d5fb6ecc73 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -301,6 +301,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Add the initial set of chains, excluding the sequence corresponding to
# initial event.
for chain_id, seq_no in event_chains.items():
+ # Check if the initial event is the first item in the chain. If so, then
+ # there is nothing new to add from this chain.
+ if seq_no == 1:
+ continue
+
chains[chain_id] = max(seq_no - 1, chains.get(chain_id, 0))
# Now for each chain we figure out the maximum sequence number reachable
|