summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/__init__.py48
-rw-r--r--synapse/storage/databases/main/account_data.py1
-rw-r--r--synapse/storage/databases/main/appservice.py1
-rw-r--r--synapse/storage/databases/main/cache.py1
-rw-r--r--synapse/storage/databases/main/censor_events.py1
-rw-r--r--synapse/storage/databases/main/client_ips.py1
-rw-r--r--synapse/storage/databases/main/deviceinbox.py1
-rw-r--r--synapse/storage/databases/main/devices.py24
-rw-r--r--synapse/storage/databases/main/directory.py1
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py1
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py1
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/event_push_actions.py1
-rw-r--r--synapse/storage/databases/main/events.py65
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py1
-rw-r--r--synapse/storage/databases/main/events_forward_extremities.py1
-rw-r--r--synapse/storage/databases/main/events_worker.py14
-rw-r--r--synapse/storage/databases/main/filtering.py1
-rw-r--r--synapse/storage/databases/main/group_server.py1
-rw-r--r--synapse/storage/databases/main/keys.py1
-rw-r--r--synapse/storage/databases/main/media_repository.py1
-rw-r--r--synapse/storage/databases/main/metrics.py1
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py1
-rw-r--r--synapse/storage/databases/main/presence.py93
-rw-r--r--synapse/storage/databases/main/profile.py1
-rw-r--r--synapse/storage/databases/main/purge_events.py1
-rw-r--r--synapse/storage/databases/main/push_rule.py1
-rw-r--r--synapse/storage/databases/main/pusher.py1
-rw-r--r--synapse/storage/databases/main/receipts.py1
-rw-r--r--synapse/storage/databases/main/registration.py13
-rw-r--r--synapse/storage/databases/main/rejections.py1
-rw-r--r--synapse/storage/databases/main/relations.py1
-rw-r--r--synapse/storage/databases/main/room.py1
-rw-r--r--synapse/storage/databases/main/roommember.py201
-rw-r--r--synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py1
-rw-r--r--synapse/storage/databases/main/schema/delta/57/local_current_membership.py1
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres20
-rw-r--r--synapse/storage/databases/main/search.py4
-rw-r--r--synapse/storage/databases/main/signatures.py1
-rw-r--r--synapse/storage/databases/main/state.py1
-rw-r--r--synapse/storage/databases/main/state_deltas.py1
-rw-r--r--synapse/storage/databases/main/stats.py1
-rw-r--r--synapse/storage/databases/main/stream.py5
-rw-r--r--synapse/storage/databases/main/tags.py1
-rw-r--r--synapse/storage/databases/main/transactions.py1
-rw-r--r--synapse/storage/databases/main/ui_auth.py1
-rw-r--r--synapse/storage/databases/main/user_directory.py1
-rw-r--r--synapse/storage/databases/main/user_erasure_store.py1
50 files changed, 347 insertions, 217 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py

index b3d16ca7ac..49c7606d51 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # Copyright 2019-2021 The Matrix.org Foundation C.I.C. @@ -18,7 +17,6 @@ import logging from typing import List, Optional, Tuple -from synapse.api.constants import PresenceState from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import DatabasePool from synapse.storage.databases.main.stats import UserSortOrder @@ -52,7 +50,7 @@ from .media_repository import MediaRepositoryStore from .metrics import ServerMetricsStore from .monthly_active_users import MonthlyActiveUsersStore from .openid import OpenIdStore -from .presence import PresenceStore, UserPresenceState +from .presence import PresenceStore from .profile import ProfileStore from .purge_events import PurgeEventsStore from .push_rule import PushRuleStore @@ -127,9 +125,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -178,21 +173,6 @@ class DataStore( super().__init__(database, db_conn, hs) - self._presence_on_startup = self._get_active_presence(db_conn) - - presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( - db_conn, - "presence_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=self._presence_id_gen.get_current_token(), - ) - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", - min_presence_val, - prefilled_cache=presence_cache_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max @@ -239,32 +219,6 @@ class DataStore( def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def take_presence_startup_info(self): - active_on_startup = self._presence_on_startup - self._presence_on_startup = None - return active_on_startup - - def _get_active_presence(self, db_conn): - """Fetch non-offline presence from the database so that we can register - the appropriate time outs. - """ - - sql = ( - "SELECT user_id, state, last_active_ts, last_federation_update_ts," - " last_user_sync_ts, status_msg, currently_active FROM presence_stream" - " WHERE state != ?" - ) - - txn = db_conn.cursor() - txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) - txn.close() - - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index a277a1ef13..1d02795f43 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85bb853d33..9f182c2a89 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 1e7637a6f5..ecc1f935e2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py
index 3e26d5ba87..f22c1f241b 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index ea3c15fd0e..d60010e942 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 691080ce74..7c9d1f744e 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 9bf8ba888f..c9346de316 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. @@ -16,7 +15,7 @@ # limitations under the License. import abc import logging -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( @@ -32,7 +31,7 @@ from synapse.storage.database import ( LoggingTransaction, make_tuple_comparison_clause, ) -from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key +from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache @@ -718,7 +717,15 @@ class DeviceWorkerStore(SQLBaseStore): keyvalues={"user_id": user_id}, values={}, insertion_values={"added_ts": self._clock.time_msec()}, - desc="make_remote_user_device_cache_as_stale", + desc="mark_remote_user_device_cache_as_stale", + ) + + async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None: + # Remove the database entry that says we need to resync devices, after a resync + await self.db_pool.simple_delete( + table="device_lists_remote_resync", + keyvalues={"user_id": user_id}, + desc="mark_remote_user_device_cache_as_valid", ) async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None: @@ -1290,15 +1297,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): lock=False, ) - # If we're replacing the remote user's device list cache presumably - # we've done a full resync, so we remove the entry that says we need - # to resync - self.db_pool.simple_delete_txn( - txn, - table="device_lists_remote_resync", - keyvalues={"user_id": user_id}, - ) - async def add_device_change_to_streams( self, user_id: str, device_ids: Collection[str], hosts: List[str] ): diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 267b948397..86075bc55b 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index 12cecceec2..b15fb71e62 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd # Copyright 2019 Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index f1e7859d26..88afe97c41 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a956be491a..ff81d5cd17 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,7 +14,7 @@ import itertools import logging from queue import Empty, PriorityQueue -from typing import Dict, Iterable, List, Set, Tuple +from typing import Collection, Dict, Iterable, List, Set, Tuple from synapse.api.errors import StoreError from synapse.events import EventBase @@ -26,7 +25,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.types import Collection from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 78245ad5bd..5845322118 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ad17123915..fd25c8112d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. @@ -171,7 +170,7 @@ class PersistEventsStore: ) async with stream_ordering_manager as stream_orderings: - for (event, context), stream in zip(events_and_contexts, stream_orderings): + for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( @@ -298,7 +297,7 @@ class PersistEventsStore: txn.execute(sql + clause, args) to_recursively_check = [] - for event_id, prev_event_id, metadata, rejected in txn: + for _, prev_event_id, metadata, rejected in txn: if prev_event_id in existing_prevs: continue @@ -1128,7 +1127,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in new_forward_extremities.items(): + for room_id in new_forward_extremities.keys(): self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -1379,24 +1378,28 @@ class PersistEventsStore: ], ) - for event, _ in events_and_contexts: - if not event.internal_metadata.is_redacted(): - # If we're persisting an unredacted event we go and ensure - # that we mark any redactions that reference this event as - # requiring censoring. - self.db_pool.simple_update_txn( - txn, - table="redactions", - keyvalues={"redacts": event.event_id}, - updatevalues={"have_censored": False}, + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?" + txn.execute_batch( + sql, + ( + ( + False, + event.event_id, ) + for event, _ in events_and_contexts + if not event.internal_metadata.is_redacted() + ), + ) state_events_and_contexts = [ ec for ec in events_and_contexts if ec[0].is_state() ] state_values = [] - for event, context in state_events_and_contexts: + for event, _ in state_events_and_contexts: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -1465,7 +1468,7 @@ class PersistEventsStore: # nothing to do here return - for event, context in events_and_contexts: + for event, _ in events_and_contexts: if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. @@ -1882,20 +1885,28 @@ class PersistEventsStore: ), ) - for event, _ in events_and_contexts: - user_ids = self.db_pool.simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={"event_id": event.event_id}, - retcol="user_id", - ) + room_to_event_ids = {} # type: Dict[str, List[str]] + for e, _ in events_and_contexts: + room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - for uid in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid), + for room_id, event_ids in room_to_event_ids.items(): + rows = self.db_pool.simple_select_many_txn( + txn, + table="event_push_actions_staging", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("user_id",), ) + user_ids = {row["user_id"] for row in rows} + + for user_id in user_ids: + txn.call_after( + self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id), + ) + # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 79e7df6ca9..cbe4be1437 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py
index b3703ae161..6d2688d711 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c00780969f..2c823e09cf 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +15,16 @@ import logging import threading from collections import namedtuple -from typing import Container, Dict, Iterable, List, Optional, Tuple, overload +from typing import ( + Collection, + Container, + Dict, + Iterable, + List, + Optional, + Tuple, + overload, +) from constantly import NamedConstant, Names from typing_extensions import Literal @@ -46,7 +54,7 @@ from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator -from synapse.types import Collection, JsonDict, get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index d2f5b9a502..bb244a03c0 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index bd7826f4e9..66ad363bfb 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index d504323b03..0e86807834 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd. # diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index b7820ac7ff..c584868188 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020-2021 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 614a418a15..c3f551d377 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 757da3d55d..fe25638289 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 0ff693a310..db22fab23e 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,16 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple -from synapse.api.presence import UserPresenceState +from synapse.api.presence import PresenceState, UserPresenceState +from synapse.replication.tcp.streams import PresenceStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.types import Connection +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + class PresenceStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: Connection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_persist_presence = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + + if isinstance(database.engine, PostgresEngine): + self._presence_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="presence_stream", + instance_name=self._instance_name, + tables=[("presence_stream", "instance_name", "stream_id")], + sequence_name="presence_stream_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._presence_id_gen = StreamIdGenerator( + db_conn, "presence_stream", "stream_id" + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( + db_conn, + "presence_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._presence_id_gen.get_current_token(), + ) + self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, + ) + async def update_presence(self, presence_states): + assert self._can_persist_presence + stream_ordering_manager = self._presence_id_gen.get_next_mult( len(presence_states) ) @@ -58,6 +110,7 @@ class PresenceStore(SQLBaseStore): "last_user_sync_ts": state.last_user_sync_ts, "status_msg": state.status_msg, "currently_active": state.currently_active, + "instance_name": self._instance_name, } for stream_id, state in zip(stream_orderings, presence_states) ], @@ -217,3 +270,37 @@ class PresenceStore(SQLBaseStore): def get_current_presence_token(self): return self._presence_id_gen.get_current_token() + + def _get_active_presence(self, db_conn: Connection): + """Fetch non-offline presence from the database so that we can register + the appropriate time outs. + """ + + sql = ( + "SELECT user_id, state, last_active_ts, last_federation_update_ts," + " last_user_sync_ts, status_msg, currently_active FROM presence_stream" + " WHERE state != ?" + ) + + txn = db_conn.cursor() + txn.execute(sql, (PresenceState.OFFLINE,)) + rows = self.db_pool.cursor_to_dict(txn) + txn.close() + + for row in rows: + row["currently_active"] = bool(row["currently_active"]) + + return [UserPresenceState(**row) for row in rows] + + def take_presence_startup_info(self): + active_on_startup = self._presence_on_startup + self._presence_on_startup = None + return active_on_startup + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == PresenceStream.NAME: + self._presence_id_gen.advance(instance_name, token) + for row in rows: + self.presence_stream_cache.entity_has_changed(row.user_id, token) + self._get_presence_for_user.invalidate((row.user_id,)) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 1e65cb8a04..1a34118f7e 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 41f4fe7f95..8f83748b5e 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 9e58dc0e6a..db52176337 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index c65558c280..b48fe086d4 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 43c852c96c..3647276acb 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index c439b58db6..d36b18a0e9 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. @@ -92,11 +91,17 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): id_column=None, ) - self._account_validity_enabled = hs.config.account_validity_enabled + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) + self._account_validity_period = None + self._account_validity_startup_job_max_delta = None if self._account_validity_enabled: - self._account_validity_period = hs.config.account_validity_period + self._account_validity_period = ( + hs.config.account_validity.account_validity_period + ) self._account_validity_startup_job_max_delta = ( - hs.config.account_validity_startup_job_max_delta + hs.config.account_validity.account_validity_startup_job_max_delta ) if hs.config.run_background_tasks: diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py
index 1e361aaa9a..167318b314 100644 --- a/synapse/storage/databases/main/rejections.py +++ b/synapse/storage/databases/main/rejections.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 5cd61547f7..2bbf6d6a95 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index a76e9ae2e7..b4e3f052cc 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a9216ca9ae..2a8532f8c1 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # @@ -14,7 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) + +import attr from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -34,7 +46,7 @@ from synapse.storage.roommember import ( ProfileInfo, RoomsForUser, ) -from synapse.types import Collection, PersistedEventPosition, get_domain_from_id +from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -54,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + # Is the current_state_events.membership up to date? Or is the # background update still running? self._current_state_events_membership_up_to_date = False @@ -174,6 +190,33 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN)) return [r[0] for r in txn] + @cached(max_entries=100000, iterable=True) + async def get_users_in_room_with_profiles( + self, room_id: str + ) -> Dict[str, ProfileInfo]: + """Get a mapping from user ID to profile information for all users in a given room. + + Args: + room_id: The ID of the room to retrieve the users of. + + Returns: + A mapping from user ID to ProfileInfo. + """ + + def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: + sql = """ + SELECT user_id, display_name, avatar_url FROM room_memberships + WHERE room_id = ? AND membership = ? + """ + txn.execute(sql, (room_id, Membership.JOIN)) + + return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn} + + return await self.db_pool.runInteraction( + "get_users_in_room_with_profiles", + _get_users_in_room_with_profiles, + ) + @cached(max_entries=100000) async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]: """Get the details of a room roughly suitable for use by the room @@ -704,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(num_args=2, max_entries=10000, iterable=True) async def _get_joined_hosts( - self, room_id, state_group, current_state_ids, state_entry - ): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's - # with a state_group of None are likely to be different. + self, + room_id: str, + state_group: int, + current_state_ids: StateMap[str], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, its there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). assert state_group is not None - + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. cache = await self._get_joined_hosts_cache(room_id) - return await cache.get_destinations(state_entry) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + with (await self._joined_host_linearizer.queue(room_id)): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + joined_users = await self.get_joined_users_from_state( + room_id, state_entry + ) + + cache.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache": - return _JoinedHostsCache(self, room_id) + return _JoinedHostsCache() @cached(num_args=2) async def did_forget(self, user_id: str, room_id: str) -> bool: @@ -1026,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) +@attr.s(slots=True) class _JoinedHostsCache: - """Cache for joined hosts in a room that is optimised to handle updates - via state deltas. - """ - - def __init__(self, store, room_id): - self.store = store - self.room_id = room_id - - self.hosts_to_joined_users = {} - - self.state_group = object() + """The cached data used by the `_get_joined_hosts_cache`.""" - self.linearizer = Linearizer("_JoinedHostsCache") + # Dict of host to the set of their users in the room at the state group. + hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) - self._len = 0 - - async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]: - """Get set of destinations for a state entry - - Args: - state_entry - - Returns: - The destinations as a set. - """ - if state_entry.state_group == self.state_group: - return frozenset(self.hosts_to_joined_users) - - with (await self.linearizer.queue(())): - if state_entry.state_group == self.state_group: - pass - elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = self.hosts_to_joined_users.setdefault(host, set()) - - event = await self.store.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - self.hosts_to_joined_users.pop(host, None) - else: - joined_users = await self.store.get_joined_users_from_state( - self.room_id, state_entry - ) - - self.hosts_to_joined_users = {} - for user_id in joined_users: - host = intern_string(get_domain_from_id(user_id)) - self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - self.state_group = state_entry.state_group - else: - self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) - return frozenset(self.hosts_to_joined_users) + # The state group `hosts_to_joined_users` is derived from. Will be an object + # if the instance is newly created or if the state is not based on a state + # group. (An object is used as a sentinel value to ensure that it never is + # equal to anything else). + state_group = attr.ib(type=Union[object, int], factory=object) def __len__(self): - return self._len + return sum(len(v) for v in self.hosts_to_joined_users.values()) diff --git a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py
index b1684a8441..acd6ad1e1f 100644 --- a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py
index 44917f0a2e..66989222e6 100644 --- a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py +++ b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql new file mode 100644
index 0000000000..4836dac16e --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql
@@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track when users renew their account using the value of the 'renewal_token' column. +-- This field should be set to NULL after a fresh token is generated. +ALTER TABLE account_validity ADD token_used_ts_ms BIGINT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql new file mode 100644
index 0000000000..b6ba0bda1a --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
@@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to specify which instance wrote the row. Historic rows have +-- `NULL`, which indicates that the master instance wrote them. +ALTER TABLE presence_stream ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres new file mode 100644
index 0000000000..02b182adf9 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
@@ -0,0 +1,20 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence; + +SELECT setval('presence_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream +)); diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f5e7d9ef98..6480d5a9f5 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +15,7 @@ import logging import re from collections import namedtuple -from typing import List, Optional, Set +from typing import Collection, List, Optional, Set from synapse.api.errors import SynapseError from synapse.events import EventBase @@ -24,7 +23,6 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import Collection logger = logging.getLogger(__name__) diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index c8c67953e4..ab2159c2d3 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 93431efe00..1757064a68 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 0dbb501f16..bff7d0404f 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b33c93da2d..82a1833509 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 91f8abb67d..7581c7d3ff 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd # Copyright 2018-2019 New Vector Ltd @@ -38,7 +37,7 @@ what sort order was used: import abc import logging from collections import namedtuple -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple from twisted.internet import defer @@ -54,7 +53,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import Collection, PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 50067eabfc..1d62c6140f 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index b7072f1f5e..82335e7a9d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index 5473ec1485..22c05cdde7 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 1026f321e5..7a082fdd21 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py
index f9575b1f1f..acf6b2fb64 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License");