summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/7768.misc1
-rw-r--r--synapse/replication/slave/storage/account_data.py5
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py3
-rw-r--r--synapse/replication/slave/storage/groups.py3
-rw-r--r--synapse/replication/slave/storage/presence.py3
-rw-r--r--synapse/replication/slave/storage/push_rule.py3
-rw-r--r--synapse/replication/slave/storage/pushers.py3
-rw-r--r--synapse/replication/slave/storage/receipts.py11
-rw-r--r--synapse/replication/slave/storage/room.py3
-rw-r--r--synapse/storage/data_stores/main/cache.py8
-rw-r--r--synapse/storage/data_stores/main/events_worker.py6
11 files changed, 27 insertions, 22 deletions
diff --git a/changelog.d/7768.misc b/changelog.d/7768.misc
new file mode 100644
index 0000000000..dfb3d24c7d
--- /dev/null
+++ b/changelog.d/7768.misc
@@ -0,0 +1 @@
+Use symbolic names for replication stream names.
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 9db6c62bc7..525b94fd87 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -16,6 +16,7 @@
 
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
 from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
 from synapse.storage.data_stores.main.tags import TagsWorkerStore
 from synapse.storage.database import Database
@@ -39,12 +40,12 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
         return self._account_data_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "tag_account_data":
+        if stream_name == TagAccountDataStream.NAME:
             self._account_data_id_gen.advance(token)
             for row in rows:
                 self.get_tags_for_user.invalidate((row.user_id,))
                 self._account_data_stream_cache.entity_has_changed(row.user_id, token)
-        elif stream_name == "account_data":
+        elif stream_name == AccountDataStream.NAME:
             self._account_data_id_gen.advance(token)
             for row in rows:
                 if not row.room_id:
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 6e7fd259d4..bd394f6b00 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -15,6 +15,7 @@
 
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import ToDeviceStream
 from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore
 from synapse.storage.database import Database
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -44,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
         )
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "to_device":
+        if stream_name == ToDeviceStream.NAME:
             self._device_inbox_id_gen.advance(token)
             for row in rows:
                 if row.entity.startswith("@"):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 1851e7d525..5d210fa3a1 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -15,6 +15,7 @@
 
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import GroupServerStream
 from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
 from synapse.storage.database import Database
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -38,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
         return self._group_updates_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "groups":
+        if stream_name == GroupServerStream.NAME:
             self._group_updates_id_gen.advance(token)
             for row in rows:
                 self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 4e0124842d..2938cb8e43 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.replication.tcp.streams import PresenceStream
 from synapse.storage import DataStore
 from synapse.storage.data_stores.main.presence import PresenceStore
 from synapse.storage.database import Database
@@ -42,7 +43,7 @@ class SlavedPresenceStore(BaseSlavedStore):
         return self._presence_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "presence":
+        if stream_name == PresenceStream.NAME:
             self._presence_id_gen.advance(token)
             for row in rows:
                 self.presence_stream_cache.entity_has_changed(row.user_id, token)
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 6adb19463a..23ec1c5b11 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.replication.tcp.streams import PushRulesStream
 from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
 
 from .events import SlavedEventStore
@@ -30,7 +31,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
         return self._push_rules_stream_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "push_rules":
+        if stream_name == PushRulesStream.NAME:
             self._push_rules_stream_id_gen.advance(token)
             for row in rows:
                 self.get_push_rules_for_user.invalidate((row.user_id,))
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index cb78b49acb..ff449f3658 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.replication.tcp.streams import PushersStream
 from synapse.storage.data_stores.main.pusher import PusherWorkerStore
 from synapse.storage.database import Database
 
@@ -32,6 +33,6 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
         return self._pushers_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "pushers":
+        if stream_name == PushersStream.NAME:
             self._pushers_id_gen.advance(token)
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index be716cc558..6982686eb5 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -14,20 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
 from synapse.storage.database import Database
 
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-# So, um, we want to borrow a load of functions intended for reading from
-# a DataStore, but we don't want to take functions that either write to the
-# DataStore or are cached and don't have cache invalidation logic.
-#
-# Rather than write duplicate versions of those functions, or lift them to
-# a common base class, we going to grab the underlying __func__ object from
-# the method descriptor on the DataStore and chuck them into our class.
-
 
 class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
     def __init__(self, database: Database, db_conn, hs):
@@ -52,7 +45,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
         self.get_receipts_for_room.invalidate((room_id, receipt_type))
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "receipts":
+        if stream_name == ReceiptsStream.NAME:
             self._receipts_id_gen.advance(token)
             for row in rows:
                 self.invalidate_caches_for_receipt(
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 8873bf37e5..8710207ada 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.replication.tcp.streams import PublicRoomsStream
 from synapse.storage.data_stores.main.room import RoomWorkerStore
 from synapse.storage.database import Database
 
@@ -31,7 +32,7 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
         return self._public_room_id_gen.get_current_token()
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "public_rooms":
+        if stream_name == PublicRoomsStream.NAME:
             self._public_room_id_gen.advance(token)
 
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index eac5a4e55b..d30766e543 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -19,7 +19,9 @@ import logging
 from typing import Any, Iterable, Optional, Tuple
 
 from synapse.api.constants import EventTypes
+from synapse.replication.tcp.streams import BackfillStream, CachesStream
 from synapse.replication.tcp.streams.events import (
+    EventsStream,
     EventsStreamCurrentStateRow,
     EventsStreamEventRow,
 )
@@ -71,10 +73,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         )
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "events":
+        if stream_name == EventsStream.NAME:
             for row in rows:
                 self._process_event_stream_row(token, row)
-        elif stream_name == "backfill":
+        elif stream_name == BackfillStream.NAME:
             for row in rows:
                 self._invalidate_caches_for_event(
                     -token,
@@ -86,7 +88,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                     row.relates_to,
                     backfilled=True,
                 )
-        elif stream_name == "caches":
+        elif stream_name == CachesStream.NAME:
             if self._cache_id_gen:
                 self._cache_id_gen.advance(instance_name, token)
 
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index a48c7a96ca..47a3e63589 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -38,6 +38,8 @@ from synapse.events.utils import prune_event
 from synapse.logging.context import PreserveLoggingContext, current_context
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import BackfillStream
+from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
@@ -113,9 +115,9 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_fetch_ongoing = 0
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "events":
+        if stream_name == EventsStream.NAME:
             self._stream_id_gen.advance(token)
-        elif stream_name == "backfill":
+        elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(-token)
 
         super().process_replication_rows(stream_name, instance_name, token, rows)