summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/client_ips.py38
-rw-r--r--synapse/storage/databases/main/deviceinbox.py31
-rw-r--r--synapse/storage/databases/main/event_federation.py5
3 files changed, 52 insertions, 22 deletions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 5a6c56b2d9..400d742bce 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -471,18 +471,15 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
         #
         # This works by finding the max last_seen that is less than the given
         # time, but has no more than N rows before it, deleting all rows with
-        # a lesser last_seen time. (We COALESCE so that the sub-SELECT always
-        # returns exactly one row).
+        # a lesser last_seen time. (We use an `IN` clause to force postgres to
+        # use the index, otherwise it tends to do a seq scan).
         sql = """
             DELETE FROM user_ips
-            WHERE last_seen <= (
-                SELECT COALESCE(MAX(last_seen), -1)
-                FROM (
-                    SELECT last_seen FROM user_ips
-                    WHERE last_seen <= ?
-                    ORDER BY last_seen ASC
-                    LIMIT 5000
-                ) AS u
+            WHERE last_seen IN (
+                SELECT last_seen FROM user_ips
+                WHERE last_seen <= ?
+                ORDER BY last_seen ASC
+                LIMIT 5000
             )
         """
 
@@ -595,6 +592,27 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
         device_id: Optional[str],
         now: Optional[int] = None,
     ) -> None:
+        """Record that `user_id` used `access_token` from this `ip` address.
+
+        This method does two things.
+
+        1. It queues up a row to be upserted into the `client_ips` table. These happen
+           periodically; see _update_client_ips_batch.
+        2. It immediately records this user as having taken action for the purposes of
+           MAU tracking.
+
+        Any DB writes take place on the background tasks worker, falling back to the
+        main process. If we're not that worker, this method emits a replication payload
+        to run this logic on that worker.
+
+        Two caveats to note:
+
+         - We only take action once per LAST_SEEN_GRANULARITY, to avoid spamming the
+           DB with writes.
+         - Requests using the sliding-sync proxy's user agent are excluded, as its
+           requests are not directly driven by end-users. This is a hack and we're not
+           very proud of it.
+        """
         # The sync proxy continuously triggers /sync even if the user is not
         # present so should be excluded from user_ips entries.
         if user_agent == "sync-v3-proxy-":
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 753bb32a46..40477b9da0 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -92,25 +92,32 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 self._instance_name in hs.config.worker.writers.to_device
             )
 
-            self._device_inbox_id_gen: AbstractStreamIdGenerator = (
+            self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
                 MultiWriterIdGenerator(
                     db_conn=db_conn,
                     db=database,
                     notifier=hs.get_replication_notifier(),
                     stream_name="to_device",
                     instance_name=self._instance_name,
-                    tables=[("device_inbox", "instance_name", "stream_id")],
+                    tables=[
+                        ("device_inbox", "instance_name", "stream_id"),
+                        ("device_federation_outbox", "instance_name", "stream_id"),
+                    ],
                     sequence_name="device_inbox_sequence",
                     writers=hs.config.worker.writers.to_device,
                 )
             )
         else:
             self._can_write_to_device = True
-            self._device_inbox_id_gen = StreamIdGenerator(
-                db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
+            self._to_device_msg_id_gen = StreamIdGenerator(
+                db_conn,
+                hs.get_replication_notifier(),
+                "device_inbox",
+                "stream_id",
+                extra_tables=[("device_federation_outbox", "stream_id")],
             )
 
-        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+        max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
         device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
             db_conn,
             "device_inbox",
@@ -150,8 +157,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
     ) -> None:
         if stream_name == ToDeviceStream.NAME:
             # If replication is happening than postgres must be being used.
-            assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
-            self._device_inbox_id_gen.advance(instance_name, token)
+            assert isinstance(self._to_device_msg_id_gen, MultiWriterIdGenerator)
+            self._to_device_msg_id_gen.advance(instance_name, token)
             for row in rows:
                 if row.entity.startswith("@"):
                     self._device_inbox_stream_cache.entity_has_changed(
@@ -167,11 +174,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         self, stream_name: str, instance_name: str, token: int
     ) -> None:
         if stream_name == ToDeviceStream.NAME:
-            self._device_inbox_id_gen.advance(instance_name, token)
+            self._to_device_msg_id_gen.advance(instance_name, token)
         super().process_replication_position(stream_name, instance_name, token)
 
     def get_to_device_stream_token(self) -> int:
-        return self._device_inbox_id_gen.get_current_token()
+        return self._to_device_msg_id_gen.get_current_token()
 
     async def get_messages_for_user_devices(
         self,
@@ -806,7 +813,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                                 msg.get(EventContentFields.TO_DEVICE_MSGID),
                             )
 
-        async with self._device_inbox_id_gen.get_next() as stream_id:
+        async with self._to_device_msg_id_gen.get_next() as stream_id:
             now_ms = self._clock.time_msec()
             await self.db_pool.runInteraction(
                 "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
@@ -818,7 +825,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     destination, stream_id
                 )
 
-        return self._device_inbox_id_gen.get_current_token()
+        return self._to_device_msg_id_gen.get_current_token()
 
     async def add_messages_from_remote_to_device_inbox(
         self,
@@ -862,7 +869,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 txn, stream_id, local_messages_by_user_then_device
             )
 
-        async with self._device_inbox_id_gen.get_next() as stream_id:
+        async with self._to_device_msg_id_gen.get_next() as stream_id:
             now_ms = self._clock.time_msec()
             await self.db_pool.runInteraction(
                 "add_messages_from_remote_to_device_inbox",
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c83b26b702..ddc2baf95d 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -307,6 +307,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         # Add the initial set of chains, excluding the sequence corresponding to
         # initial event.
         for chain_id, seq_no in event_chains.items():
+            # Check if the initial event is the first item in the chain. If so, then
+            # there is nothing new to add from this chain.
+            if seq_no == 1:
+                continue
+
             chains[chain_id] = max(seq_no - 1, chains.get(chain_id, 0))
 
         # Now for each chain we figure out the maximum sequence number reachable