diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/admin_cmd.py | 4 | ||||
-rw-r--r-- | synapse/app/generic_worker.py | 4 | ||||
-rw-r--r-- | synapse/module_api/__init__.py | 6 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 14 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 7 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/34/cache_stream.py | 2 |
10 files changed, 22 insertions, 35 deletions
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b05fe2c589..f9aada269a 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -64,7 +64,7 @@ from synapse.util.logcontext import LoggingContext logger = logging.getLogger("synapse.app.admin_cmd") -class AdminCmdSlavedStore( +class AdminCmdStore( FilteringWorkerStore, ClientIpWorkerStore, DeviceWorkerStore, @@ -103,7 +103,7 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): - DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore + DATASTORE_CLASS = AdminCmdStore # type: ignore async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None: diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e17ce35b8e..909ebccf78 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -102,7 +102,7 @@ from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") -class GenericWorkerSlavedStore( +class GenericWorkerStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, @@ -154,7 +154,7 @@ class GenericWorkerSlavedStore( class GenericWorkerServer(HomeServer): - DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore + DATASTORE_CLASS = GenericWorkerStore # type: ignore def _listen_http(self, listener_config: ListenerConfig) -> None: assert listener_config.http_options is not None diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2c9d181acf..0e9f366cba 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -134,7 +134,7 @@ from synapse.util.caches.descriptors import CachedFunction, cached as _cached from synapse.util.frozenutils import freeze if TYPE_CHECKING: - from synapse.app.generic_worker import GenericWorkerSlavedStore + from synapse.app.generic_worker import GenericWorkerStore from synapse.server import HomeServer @@ -237,9 +237,7 @@ class ModuleApi: # TODO: Fix this type hint once the types for the data stores have been ironed # out. - self._store: Union[ - DataStore, "GenericWorkerSlavedStore" - ] = hs.get_datastores().main + self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._auth = hs.get_auth() self._auth_handler = auth_handler diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 200f667fdf..139f57cf86 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -60,7 +60,7 @@ _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5 class ReplicationDataHandler: """Handles incoming stream updates from replication. - This instance notifies the slave data store about updates. Can be subclassed + This instance notifies the data store about updates. Can be subclassed to handle updates in additional ways. """ @@ -91,7 +91,7 @@ class ReplicationDataHandler: ) -> None: """Called to handle a batch of replication data with a given stream token. - By default this just pokes the slave store. Can be overridden in subclasses to + By default, this just pokes the data store. Can be overridden in subclasses to handle more. Args: diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index a9843f6e17..8f7bdbc61a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) writers=hs.config.worker.writers.account_data, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index bd07d20171..46fa0a73f9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ cache_func = getattr(self, cache_name, None) @@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ txn.call_after(cache_func.invalidate, keys) @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_all_cache_and_stream( self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: - """Invalidates the entire cache and adds it to the cache stream so slaves + """Invalidates the entire cache and adds it to the cache stream so other workers will know to invalidate their caches. """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5503621ad6..a67fdb3c22 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): is_writer=hs.config.worker.worker_app is None, ) - # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a - # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0ff3fc7369..53aa5933d5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore): writers=hs.config.worker.writers.events, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 074942b167..5ee5c7ad9f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore): else: self._can_write_to_receipts = True + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py index 682c86da1a..882f9b893b 100644 --- a/synapse/storage/schema/main/delta/34/cache_stream.py +++ b/synapse/storage/schema/main/delta/34/cache_stream.py @@ -21,7 +21,7 @@ from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) -# This stream is used to notify replication slaves that some caches have +# This stream is used to notify workers over replication that some caches have # been invalidated that they cannot infer from the other streams. CREATE_TABLE = """ CREATE TABLE cache_invalidation_stream ( |