summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:23:06 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-08-31 17:23:06 +0100
commit87e978bc76c50429784d8b1a31d00a4ace24e874 (patch)
tree5ff27244144244321a6586f0952a28e4dd0cf608 /synapse/storage/databases/main
parentMerge tag 'v1.35.1' into babolivier/dinsic_1.41.0 (diff)
parent 1.36.0 (diff)
downloadsynapse-87e978bc76c50429784d8b1a31d00a4ace24e874.tar.xz
Merge tag 'v1.36.0' into babolivier/dinsic_1.41.0
Synapse 1.36.0 (2021-06-15)
===========================

No significant changes.

Synapse 1.36.0rc2 (2021-06-11)
==============================

Bugfixes
--------

- Fix a bug which caused  presence updates to stop working some time after a restart, when using a presence writer worker. Broke in v1.33.0. ([\#10149](https://github.com/matrix-org/synapse/issues/10149))
- Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0. ([\#10163](https://github.com/matrix-org/synapse/issues/10163))
- Fix a bug where Synapse could send the same presence update to a remote twice. ([\#10165](https://github.com/matrix-org/synapse/issues/10165))

Synapse 1.36.0rc1 (2021-06-08)
==============================

Features
--------

- Add new endpoint `/_matrix/client/r0/rooms/{roomId}/aliases` from Client-Server API r0.6.1 (previously [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432)). ([\#9224](https://github.com/matrix-org/synapse/issues/9224))
- Improve performance of incoming federation transactions in large rooms. ([\#9953](https://github.com/matrix-org/synapse/issues/9953), [\#9973](https://github.com/matrix-org/synapse/issues/9973))
- Rewrite logic around verifying JSON object and fetching server keys to be more performant and use less memory. ([\#10035](https://github.com/matrix-org/synapse/issues/10035))
- Add new admin APIs for unprotecting local media from quarantine. Contributed by @dklimpel. ([\#10040](https://github.com/matrix-org/synapse/issues/10040))
- Add new admin APIs to remove media by media ID from quarantine. Contributed by @dklimpel. ([\#10044](https://github.com/matrix-org/synapse/issues/10044))
- Make reason and score parameters optional for reporting content. Implements [MSC2414](https://github.com/matrix-org/matrix-doc/pull/2414). Contributed by Callum Brown. ([\#10077](https://github.com/matrix-org/synapse/issues/10077))
- Add support for routing more requests to workers. ([\#10084](https://github.com/matrix-org/synapse/issues/10084))
- Report OpenTracing spans for database activity. ([\#10113](https://github.com/matrix-org/synapse/issues/10113), [\#10136](https://github.com/matrix-org/synapse/issues/10136), [\#10141](https://github.com/matrix-org/synapse/issues/10141))
- Significantly reduce memory usage of joining large remote rooms. ([\#10117](https://github.com/matrix-org/synapse/issues/10117))

Bugfixes
--------

- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. ([\#10082](https://github.com/matrix-org/synapse/issues/10082))
- Fix a bug in the `force_tracing_for_users` option introduced in Synapse v1.35 which meant that the OpenTracing spans produced were missing most tags. ([\#10092](https://github.com/matrix-org/synapse/issues/10092))
- Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder. ([\#10107](https://github.com/matrix-org/synapse/issues/10107))
- Fix bug where the server would attempt to fetch the same history in the room from a remote server multiple times in parallel. ([\#10116](https://github.com/matrix-org/synapse/issues/10116))
- Fix a bug introduced in Synapse 1.33.0 which caused replication requests to fail when receiving a lot of very large events via federation. ([\#10118](https://github.com/matrix-org/synapse/issues/10118))
- Fix bug when using workers where pagination requests failed if a remote server returned zero events from `/backfill`. Introduced in 1.35.0. ([\#10133](https://github.com/matrix-org/synapse/issues/10133))

Improved Documentation
----------------------

- Clarify security note regarding hosting Synapse on the same domain as other web applications. ([\#9221](https://github.com/matrix-org/synapse/issues/9221))
- Update CAPTCHA documentation to mention turning off the verify origin feature. Contributed by @aaronraimist. ([\#10046](https://github.com/matrix-org/synapse/issues/10046))
- Tweak wording of database recommendation in `INSTALL.md`. Contributed by @aaronraimist. ([\#10057](https://github.com/matrix-org/synapse/issues/10057))
- Add initial infrastructure for rendering Synapse documentation with mdbook. ([\#10086](https://github.com/matrix-org/synapse/issues/10086))
- Convert the remaining Admin API documentation files to markdown. ([\#10089](https://github.com/matrix-org/synapse/issues/10089))
- Make a link in docs use HTTPS. Contributed by @RhnSharma. ([\#10130](https://github.com/matrix-org/synapse/issues/10130))
- Fix broken link in Docker docs. ([\#10132](https://github.com/matrix-org/synapse/issues/10132))

Deprecations and Removals
-------------------------

- Remove the experimental `spaces_enabled` flag. The spaces features are always available now. ([\#10063](https://github.com/matrix-org/synapse/issues/10063))

Internal Changes
----------------

- Tell CircleCI to build Docker images from `main` branch. ([\#9906](https://github.com/matrix-org/synapse/issues/9906))
- Simplify naming convention for release branches to only include the major and minor version numbers. ([\#10013](https://github.com/matrix-org/synapse/issues/10013))
- Add `parse_strings_from_args` for parsing an array from query parameters. ([\#10048](https://github.com/matrix-org/synapse/issues/10048), [\#10137](https://github.com/matrix-org/synapse/issues/10137))
- Remove some dead code regarding TLS certificate handling. ([\#10054](https://github.com/matrix-org/synapse/issues/10054))
- Remove redundant, unmaintained `convert_server_keys` script. ([\#10055](https://github.com/matrix-org/synapse/issues/10055))
- Improve the error message printed by synctl when synapse fails to start. ([\#10059](https://github.com/matrix-org/synapse/issues/10059))
- Fix GitHub Actions lint for newsfragments. ([\#10069](https://github.com/matrix-org/synapse/issues/10069))
- Update opentracing to inject the right context into the carrier. ([\#10074](https://github.com/matrix-org/synapse/issues/10074))
- Fix up `BatchingQueue` implementation. ([\#10078](https://github.com/matrix-org/synapse/issues/10078))
- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091))
- In Github Actions workflows, summarize the Sytest results in an easy-to-read format. ([\#10094](https://github.com/matrix-org/synapse/issues/10094))
- Make `/sync` do fewer state resolutions. ([\#10102](https://github.com/matrix-org/synapse/issues/10102))
- Add missing type hints to the admin API servlets. ([\#10105](https://github.com/matrix-org/synapse/issues/10105))
- Improve opentracing annotations for `Notifier`. ([\#10111](https://github.com/matrix-org/synapse/issues/10111))
- Enable Prometheus metrics for the jaeger client library. ([\#10112](https://github.com/matrix-org/synapse/issues/10112))
- Work to improve the responsiveness of `/sync` requests. ([\#10124](https://github.com/matrix-org/synapse/issues/10124))
- OpenTracing: use a consistent name for background processes. ([\#10135](https://github.com/matrix-org/synapse/issues/10135))
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/cache.py7
-rw-r--r--synapse/storage/databases/main/devices.py2
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/events.py8
-rw-r--r--synapse/storage/databases/main/events_worker.py61
-rw-r--r--synapse/storage/databases/main/media_repository.py7
-rw-r--r--synapse/storage/databases/main/presence.py2
-rw-r--r--synapse/storage/databases/main/purge_events.py26
-rw-r--r--synapse/storage/databases/main/receipts.py6
-rw-r--r--synapse/storage/databases/main/room.py32
10 files changed, 111 insertions, 42 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py

index ecc1f935e2..c57ae5ef15 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -168,10 +168,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): backfilled, ): self._invalidate_get_event_cache(event_id) + self.have_seen_event.invalidate((room_id, event_id)) self.get_latest_event_ids_in_room.invalidate((room_id,)) - self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) @@ -184,8 +185,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_invited_rooms_for_local_user.invalidate((state_key,)) if relates_to: - self.get_relations_for_event.invalidate_many((relates_to,)) - self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) + self.get_relations_for_event.invalidate((relates_to,)) + self.get_aggregation_groups_for_event.invalidate((relates_to,)) self.get_applicable_edit.invalidate((relates_to,)) async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index fd87ba71ab..18f07d96dc 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,)) - txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) + txn.call_after(self._get_cached_user_device.invalidate, (user_id,)) txn.call_after( self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 5845322118..d1237c65cc 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): not be deleted. """ txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fd25c8112d..897fa06639 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1748,9 +1748,9 @@ class PersistEventsStore: }, ) - txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,)) + txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,)) txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,) + self.store.get_aggregation_groups_for_event.invalidate, (parent_id,) ) if rel_type == RelationTypes.REPLACE: @@ -1903,7 +1903,7 @@ class PersistEventsStore: for user_id in user_ids: txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id, user_id), ) @@ -1917,7 +1917,7 @@ class PersistEventsStore: def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + self.store.get_unread_event_push_actions_by_room_for_user.invalidate, (room_id,), ) txn.execute( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6963bbf7f4..403a5ddaba 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -22,6 +22,7 @@ from typing import ( Iterable, List, Optional, + Set, Tuple, overload, ) @@ -55,7 +56,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1045,32 +1046,74 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} - async def have_seen_events(self, event_ids): + async def have_seen_events( + self, room_id: str, event_ids: Iterable[str] + ) -> Set[str]: """Given a list of event ids, check if we have already processed them. + The room_id is only used to structure the cache (so that it can later be + invalidated by room_id) - there is no guarantee that the events are actually + in the room in question. + Args: - event_ids (iterable[str]): + room_id: Room we are polling + event_ids: events we are looking for Returns: set[str]: The events we have already seen. """ + res = await self._have_seen_events_dict( + (room_id, event_id) for event_id in event_ids + ) + return {eid for ((_rid, eid), have_event) in res.items() if have_event} + + @cachedList("have_seen_event", "keys") + async def _have_seen_events_dict( + self, keys: Iterable[Tuple[str, str]] + ) -> Dict[Tuple[str, str], bool]: + """Helper for have_seen_events + + Returns: + a dict {(room_id, event_id)-> bool} + """ # if the event cache contains the event, obviously we've seen it. - results = {x for x in event_ids if self._get_event_cache.contains(x)} - def have_seen_events_txn(txn, chunk): - sql = "SELECT event_id FROM events as e WHERE " + cache_results = { + (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,)) + } + results = {x: True for x in cache_results} + + def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]): + # we deliberately do *not* query the database for room_id, to make the + # query an index-only lookup on `events_event_id_key`. + # + # We therefore pull the events from the database into a set... + + sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", chunk + txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk] ) txn.execute(sql + clause, args) - results.update(row[0] for row in txn) + found_events = {eid for eid, in txn} - for chunk in batch_iter((x for x in event_ids if x not in results), 100): + # ... and then we can update the results for each row in the batch + results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk}) + + # each batch requires its own index scan, so we make the batches as big as + # possible. + for chunk in batch_iter((k for k in keys if k not in cache_results), 500): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) + return results + @cached(max_entries=100000, tree=True) + async def have_seen_event(self, room_id: str, event_id: str): + # this only exists for the benefit of the @cachedList descriptor on + # _have_seen_events_dict + raise NotImplementedError() + def _get_current_state_event_counts_txn(self, txn, room_id): """ See get_current_state_event_counts. diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index c584868188..2fa945d171 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "created_ts", "quarantined_by", "url_cache", + "safe_from_quarantine", ), allow_none=True, desc="get_local_media", @@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) - async def mark_local_media_as_safe(self, media_id: str) -> None: - """Mark a local media as safe from quarantining.""" + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: + """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( table="local_media_repository", keyvalues={"media_id": media_id}, - updatevalues={"safe_from_quarantine": True}, + updatevalues={"safe_from_quarantine": safe}, desc="mark_local_media_as_safe", ) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 6a2baa7841..1388771c40 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -50,7 +50,7 @@ class PresenceStore(SQLBaseStore): instance_name=self._instance_name, tables=[("presence_stream", "instance_name", "stream_id")], sequence_name="presence_stream_sequence", - writers=hs.config.worker.writers.to_device, + writers=hs.config.worker.writers.presence, ) else: self._presence_id_gen = StreamIdGenerator( diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b5e..7fb7780d0f 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging from typing import Any, List, Set, Tuple from synapse.api.errors import SynapseError -from synapse.storage._base import SQLBaseStore +from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) -class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): +class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): async def purge_history( self, room_id: str, token: str, delete_local_events: bool ) -> Set[int]: @@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): "DELETE FROM event_to_state_groups " "WHERE event_id IN (SELECT event_id from events_to_purge)" ) - for event_id, _ in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) # Delete all remote non-state events for table in ( @@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") + for event_id, should_delete in event_rows: + self._invalidate_cache_and_stream( + txn, self._get_state_group_for_event, (event_id,) + ) + + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + if should_delete: + self._invalidate_cache_and_stream( + txn, self.have_seen_event, (room_id, event_id) + ) + logger.info("[purge] done") return referenced_state_groups @@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) - # TODO: we could probably usefully do a bunch of cache invalidation here + # TODO: we could probably usefully do a bunch more cache invalidation here + + # XXX: as with purge_history, this is racy, but no worse than other races + # that already exist. + self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) logger.info("[purge] done") diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3647276acb..edeaacd7a6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self._get_linearized_receipts_for_room.invalidate_many((room_id,)) + self._get_linearized_receipts_for_room.invalidate((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) @@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type)) # FIXME: This shouldn't invalidate the whole cache - txn.call_after( - self._get_linearized_receipts_for_room.invalidate_many, (room_id,) - ) + txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) self.db_pool.simple_delete_txn( txn, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index b4e3f052cc..bcf25f298e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -789,14 +789,15 @@ class RoomWorkerStore(SQLBaseStore): self, server_name: str, media_id: str, - quarantined_by: str, + quarantined_by: Optional[str], ) -> int: - """quarantines a single local or remote media id + """quarantines or unquarantines a single local or remote media id Args: server_name: The name of the server that holds this media media_id: The ID of the media to be quarantined quarantined_by: The user ID that initiated the quarantine request + If it is `None` media will be removed from quarantine """ logger.info("Quarantining media: %s/%s", server_name, media_id) is_local = server_name == self.config.server_name @@ -863,9 +864,9 @@ class RoomWorkerStore(SQLBaseStore): txn, local_mxcs: List[str], remote_mxcs: List[Tuple[str, str]], - quarantined_by: str, + quarantined_by: Optional[str], ) -> int: - """Quarantine local and remote media items + """Quarantine and unquarantine local and remote media items Args: txn (cursor) @@ -873,18 +874,27 @@ class RoomWorkerStore(SQLBaseStore): remote_mxcs: A list of (remote server, media id) tuples representing remote mxc URLs quarantined_by: The ID of the user who initiated the quarantine request + If it is `None` media will be removed from quarantine Returns: The total number of media items quarantined """ + # Update all the tables to set the quarantined_by flag - txn.executemany( - """ + sql = """ UPDATE local_media_repository SET quarantined_by = ? - WHERE media_id = ? AND safe_from_quarantine = ? - """, - ((quarantined_by, media_id, False) for media_id in local_mxcs), - ) + WHERE media_id = ? + """ + + # set quarantine + if quarantined_by is not None: + sql += "AND safe_from_quarantine = ?" + rows = [(quarantined_by, media_id, False) for media_id in local_mxcs] + # remove from quarantine + else: + rows = [(quarantined_by, media_id) for media_id in local_mxcs] + + txn.executemany(sql, rows) # Note that a rowcount of -1 can be used to indicate no rows were affected. total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0 @@ -1523,7 +1533,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): room_id: str, event_id: str, user_id: str, - reason: str, + reason: Optional[str], content: JsonDict, received_ts: int, ) -> None: