diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index b05fe2c589..f9aada269a 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -64,7 +64,7 @@ from synapse.util.logcontext import LoggingContext
logger = logging.getLogger("synapse.app.admin_cmd")
-class AdminCmdSlavedStore(
+class AdminCmdStore(
FilteringWorkerStore,
ClientIpWorkerStore,
DeviceWorkerStore,
@@ -103,7 +103,7 @@ class AdminCmdSlavedStore(
class AdminCmdServer(HomeServer):
- DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore
+ DATASTORE_CLASS = AdminCmdStore # type: ignore
async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e17ce35b8e..909ebccf78 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -102,7 +102,7 @@ from synapse.util.httpresourcetree import create_resource_tree
logger = logging.getLogger("synapse.app.generic_worker")
-class GenericWorkerSlavedStore(
+class GenericWorkerStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
@@ -154,7 +154,7 @@ class GenericWorkerSlavedStore(
class GenericWorkerServer(HomeServer):
- DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore
+ DATASTORE_CLASS = GenericWorkerStore # type: ignore
def _listen_http(self, listener_config: ListenerConfig) -> None:
assert listener_config.http_options is not None
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 2c9d181acf..0e9f366cba 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -134,7 +134,7 @@ from synapse.util.caches.descriptors import CachedFunction, cached as _cached
from synapse.util.frozenutils import freeze
if TYPE_CHECKING:
- from synapse.app.generic_worker import GenericWorkerSlavedStore
+ from synapse.app.generic_worker import GenericWorkerStore
from synapse.server import HomeServer
@@ -237,9 +237,7 @@ class ModuleApi:
# TODO: Fix this type hint once the types for the data stores have been ironed
# out.
- self._store: Union[
- DataStore, "GenericWorkerSlavedStore"
- ] = hs.get_datastores().main
+ self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._auth = hs.get_auth()
self._auth_handler = auth_handler
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 200f667fdf..139f57cf86 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -60,7 +60,7 @@ _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
class ReplicationDataHandler:
"""Handles incoming stream updates from replication.
- This instance notifies the slave data store about updates. Can be subclassed
+ This instance notifies the data store about updates. Can be subclassed
to handle updates in additional ways.
"""
@@ -91,7 +91,7 @@ class ReplicationDataHandler:
) -> None:
"""Called to handle a batch of replication data with a given stream token.
- By default this just pokes the slave store. Can be overridden in subclasses to
+ By default, this just pokes the data store. Can be overridden in subclasses to
handle more.
Args:
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index a9843f6e17..8f7bdbc61a 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
writers=hs.config.worker.writers.account_data,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index bd07d20171..46fa0a73f9 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
@@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
cache_func: CachedFunction,
keys: Tuple[Any, ...],
) -> None:
- """Invalidates the cache and adds it to the cache stream so slaves
+ """Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.
- This should only be used to invalidate caches where slaves won't
- otherwise know from other replication streams that the cache should
+ This should only be used to invalidate caches where other workers won't
+ otherwise have known from other replication streams that the cache should
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
@@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
def _invalidate_all_cache_and_stream(
self, txn: LoggingTransaction, cache_func: CachedFunction
) -> None:
- """Invalidates the entire cache and adds it to the cache stream so slaves
+ """Invalidates the entire cache and adds it to the cache stream so other workers
will know to invalidate their caches.
"""
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..a67fdb3c22 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
is_writer=hs.config.worker.worker_app is None,
)
- # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
- # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 0ff3fc7369..53aa5933d5 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore):
writers=hs.config.worker.writers.events,
)
else:
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 074942b167..5ee5c7ad9f 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
self._can_write_to_receipts = True
+ # Multiple writers are not supported for SQLite.
+ #
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
- #
- # If this process is the writer than we need to use
- # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
- # updated over replication. (Multiple writers are not supported for
- # SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py
index 682c86da1a..882f9b893b 100644
--- a/synapse/storage/schema/main/delta/34/cache_stream.py
+++ b/synapse/storage/schema/main/delta/34/cache_stream.py
@@ -21,7 +21,7 @@ from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
-# This stream is used to notify replication slaves that some caches have
+# This stream is used to notify workers over replication that some caches have
# been invalidated that they cannot infer from the other streams.
CREATE_TABLE = """
CREATE TABLE cache_invalidation_stream (
|