diff options
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/client_ips.py | 38 | ||||
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 31 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 5 |
3 files changed, 52 insertions, 22 deletions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 5a6c56b2d9..400d742bce 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -471,18 +471,15 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke # # This works by finding the max last_seen that is less than the given # time, but has no more than N rows before it, deleting all rows with - # a lesser last_seen time. (We COALESCE so that the sub-SELECT always - # returns exactly one row). + # a lesser last_seen time. (We use an `IN` clause to force postgres to + # use the index, otherwise it tends to do a seq scan). sql = """ DELETE FROM user_ips - WHERE last_seen <= ( - SELECT COALESCE(MAX(last_seen), -1) - FROM ( - SELECT last_seen FROM user_ips - WHERE last_seen <= ? - ORDER BY last_seen ASC - LIMIT 5000 - ) AS u + WHERE last_seen IN ( + SELECT last_seen FROM user_ips + WHERE last_seen <= ? + ORDER BY last_seen ASC + LIMIT 5000 ) """ @@ -595,6 +592,27 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke device_id: Optional[str], now: Optional[int] = None, ) -> None: + """Record that `user_id` used `access_token` from this `ip` address. + + This method does two things. + + 1. It queues up a row to be upserted into the `client_ips` table. These happen + periodically; see _update_client_ips_batch. + 2. It immediately records this user as having taken action for the purposes of + MAU tracking. + + Any DB writes take place on the background tasks worker, falling back to the + main process. If we're not that worker, this method emits a replication payload + to run this logic on that worker. + + Two caveats to note: + + - We only take action once per LAST_SEEN_GRANULARITY, to avoid spamming the + DB with writes. + - Requests using the sliding-sync proxy's user agent are excluded, as its + requests are not directly driven by end-users. This is a hack and we're not + very proud of it. + """ # The sync proxy continuously triggers /sync even if the user is not # present so should be excluded from user_ips entries. if user_agent == "sync-v3-proxy-": diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 753bb32a46..40477b9da0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -92,25 +92,32 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._instance_name in hs.config.worker.writers.to_device ) - self._device_inbox_id_gen: AbstractStreamIdGenerator = ( + self._to_device_msg_id_gen: AbstractStreamIdGenerator = ( MultiWriterIdGenerator( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), stream_name="to_device", instance_name=self._instance_name, - tables=[("device_inbox", "instance_name", "stream_id")], + tables=[ + ("device_inbox", "instance_name", "stream_id"), + ("device_federation_outbox", "instance_name", "stream_id"), + ], sequence_name="device_inbox_sequence", writers=hs.config.worker.writers.to_device, ) ) else: self._can_write_to_device = True - self._device_inbox_id_gen = StreamIdGenerator( - db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id" + self._to_device_msg_id_gen = StreamIdGenerator( + db_conn, + hs.get_replication_notifier(), + "device_inbox", + "stream_id", + extra_tables=[("device_federation_outbox", "stream_id")], ) - max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + max_device_inbox_id = self._to_device_msg_id_gen.get_current_token() device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( db_conn, "device_inbox", @@ -150,8 +157,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) -> None: if stream_name == ToDeviceStream.NAME: # If replication is happening than postgres must be being used. - assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator) - self._device_inbox_id_gen.advance(instance_name, token) + assert isinstance(self._to_device_msg_id_gen, MultiWriterIdGenerator) + self._to_device_msg_id_gen.advance(instance_name, token) for row in rows: if row.entity.startswith("@"): self._device_inbox_stream_cache.entity_has_changed( @@ -167,11 +174,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): self, stream_name: str, instance_name: str, token: int ) -> None: if stream_name == ToDeviceStream.NAME: - self._device_inbox_id_gen.advance(instance_name, token) + self._to_device_msg_id_gen.advance(instance_name, token) super().process_replication_position(stream_name, instance_name, token) def get_to_device_stream_token(self) -> int: - return self._device_inbox_id_gen.get_current_token() + return self._to_device_msg_id_gen.get_current_token() async def get_messages_for_user_devices( self, @@ -806,7 +813,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): msg.get(EventContentFields.TO_DEVICE_MSGID), ) - async with self._device_inbox_id_gen.get_next() as stream_id: + async with self._to_device_msg_id_gen.get_next() as stream_id: now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id @@ -818,7 +825,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): destination, stream_id ) - return self._device_inbox_id_gen.get_current_token() + return self._to_device_msg_id_gen.get_current_token() async def add_messages_from_remote_to_device_inbox( self, @@ -862,7 +869,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn, stream_id, local_messages_by_user_then_device ) - async with self._device_inbox_id_gen.get_next() as stream_id: + async with self._to_device_msg_id_gen.get_next() as stream_id: now_ms = self._clock.time_msec() await self.db_pool.runInteraction( "add_messages_from_remote_to_device_inbox", diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index c83b26b702..ddc2baf95d 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -307,6 +307,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # Add the initial set of chains, excluding the sequence corresponding to # initial event. for chain_id, seq_no in event_chains.items(): + # Check if the initial event is the first item in the chain. If so, then + # there is nothing new to add from this chain. + if seq_no == 1: + continue + chains[chain_id] = max(seq_no - 1, chains.get(chain_id, 0)) # Now for each chain we figure out the maximum sequence number reachable |