summary refs log tree commit diff
path: root/synapse/replication/slave
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r--synapse/replication/slave/storage/_base.py2
-rw-r--r--synapse/replication/slave/storage/account_data.py17
-rw-r--r--synapse/replication/slave/storage/appservice.py5
-rw-r--r--synapse/replication/slave/storage/client_ips.py4
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py6
-rw-r--r--synapse/replication/slave/storage/devices.py14
-rw-r--r--synapse/replication/slave/storage/events.py77
-rw-r--r--synapse/replication/slave/storage/groups.py9
-rw-r--r--synapse/replication/slave/storage/presence.py8
-rw-r--r--synapse/replication/slave/storage/push_rule.py6
-rw-r--r--synapse/replication/slave/storage/pushers.py4
-rw-r--r--synapse/replication/slave/storage/receipts.py1
-rw-r--r--synapse/replication/slave/storage/room.py4
13 files changed, 73 insertions, 84 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 817d1f67f9..182cb2a1d8 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -37,7 +37,7 @@ class BaseSlavedStore(SQLBaseStore):
         super(BaseSlavedStore, self).__init__(db_conn, hs)
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = SlavedIdTracker(
-                db_conn, "cache_invalidation_stream", "stream_id",
+                db_conn, "cache_invalidation_stream", "stream_id"
             )
         else:
             self._cache_id_gen = None
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index d9ba6d69b1..3c44d1d48d 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -21,10 +21,9 @@ from synapse.storage.tags import TagsWorkerStore
 
 
 class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
-
     def __init__(self, db_conn, hs):
         self._account_data_id_gen = SlavedIdTracker(
-            db_conn, "account_data_max_stream_id", "stream_id",
+            db_conn, "account_data_max_stream_id", "stream_id"
         )
 
         super(SlavedAccountDataStore, self).__init__(db_conn, hs)
@@ -45,24 +44,20 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
             self._account_data_id_gen.advance(token)
             for row in rows:
                 self.get_tags_for_user.invalidate((row.user_id,))
-                self._account_data_stream_cache.entity_has_changed(
-                    row.user_id, token
-                )
+                self._account_data_stream_cache.entity_has_changed(row.user_id, token)
         elif stream_name == "account_data":
             self._account_data_id_gen.advance(token)
             for row in rows:
                 if not row.room_id:
                     self.get_global_account_data_by_type_for_user.invalidate(
-                        (row.data_type, row.user_id,)
+                        (row.data_type, row.user_id)
                     )
                 self.get_account_data_for_user.invalidate((row.user_id,))
-                self.get_account_data_for_room.invalidate((row.user_id, row.room_id,))
+                self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
                 self.get_account_data_for_room_and_type.invalidate(
-                    (row.user_id, row.room_id, row.data_type,),
-                )
-                self._account_data_stream_cache.entity_has_changed(
-                    row.user_id, token
+                    (row.user_id, row.room_id, row.data_type)
                 )
+                self._account_data_stream_cache.entity_has_changed(row.user_id, token)
         return super(SlavedAccountDataStore, self).process_replication_rows(
             stream_name, token, rows
         )
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index b53a4c6bd1..cda12ea70d 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -20,6 +20,7 @@ from synapse.storage.appservice import (
 )
 
 
-class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
-                                    ApplicationServiceWorkerStore):
+class SlavedApplicationServiceStore(
+    ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore
+):
     pass
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 5b8521c770..14ced32333 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -25,9 +25,7 @@ class SlavedClientIpStore(BaseSlavedStore):
         super(SlavedClientIpStore, self).__init__(db_conn, hs)
 
         self.client_ip_last_seen = Cache(
-            name="client_ip_last_seen",
-            keylen=4,
-            max_entries=50000 * CACHE_SIZE_FACTOR,
+            name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
         )
 
     def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 4d59778863..284fd30d89 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -24,15 +24,15 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
         self._device_inbox_id_gen = SlavedIdTracker(
-            db_conn, "device_max_stream_id", "stream_id",
+            db_conn, "device_max_stream_id", "stream_id"
         )
         self._device_inbox_stream_cache = StreamChangeCache(
             "DeviceInboxStreamChangeCache",
-            self._device_inbox_id_gen.get_current_token()
+            self._device_inbox_id_gen.get_current_token(),
         )
         self._device_federation_outbox_stream_cache = StreamChangeCache(
             "DeviceFederationOutboxStreamChangeCache",
-            self._device_inbox_id_gen.get_current_token()
+            self._device_inbox_id_gen.get_current_token(),
         )
 
         self._last_device_delete_cache = ExpiringCache(
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 16c9a162c5..d9300fce33 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -27,14 +27,14 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
         self.hs = hs
 
         self._device_list_id_gen = SlavedIdTracker(
-            db_conn, "device_lists_stream", "stream_id",
+            db_conn, "device_lists_stream", "stream_id"
         )
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
-            "DeviceListStreamChangeCache", device_list_max,
+            "DeviceListStreamChangeCache", device_list_max
         )
         self._device_list_federation_stream_cache = StreamChangeCache(
-            "DeviceListFederationStreamChangeCache", device_list_max,
+            "DeviceListFederationStreamChangeCache", device_list_max
         )
 
     def stream_positions(self):
@@ -46,17 +46,13 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
         if stream_name == "device_lists":
             self._device_list_id_gen.advance(token)
             for row in rows:
-                self._invalidate_caches_for_devices(
-                    token, row.user_id, row.destination,
-                )
+                self._invalidate_caches_for_devices(token, row.user_id, row.destination)
         return super(SlavedDeviceStore, self).process_replication_rows(
             stream_name, token, rows
         )
 
     def _invalidate_caches_for_devices(self, token, user_id, destination):
-        self._device_list_stream_cache.entity_has_changed(
-            user_id, token
-        )
+        self._device_list_stream_cache.entity_has_changed(user_id, token)
 
         if destination:
             self._device_list_federation_stream_cache.entity_has_changed(
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a3952506c1..ab5937e638 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -45,21 +45,20 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(EventFederationWorkerStore,
-                       RoomMemberWorkerStore,
-                       EventPushActionsWorkerStore,
-                       StreamWorkerStore,
-                       StateGroupWorkerStore,
-                       EventsWorkerStore,
-                       SignatureWorkerStore,
-                       UserErasureWorkerStore,
-                       RelationsWorkerStore,
-                       BaseSlavedStore):
-
+class SlavedEventStore(
+    EventFederationWorkerStore,
+    RoomMemberWorkerStore,
+    EventPushActionsWorkerStore,
+    StreamWorkerStore,
+    StateGroupWorkerStore,
+    EventsWorkerStore,
+    SignatureWorkerStore,
+    UserErasureWorkerStore,
+    RelationsWorkerStore,
+    BaseSlavedStore,
+):
     def __init__(self, db_conn, hs):
-        self._stream_id_gen = SlavedIdTracker(
-            db_conn, "events", "stream_ordering",
-        )
+        self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
@@ -90,8 +89,13 @@ class SlavedEventStore(EventFederationWorkerStore,
             self._backfill_id_gen.advance(-token)
             for row in rows:
                 self.invalidate_caches_for_event(
-                    -token, row.event_id, row.room_id, row.type, row.state_key,
-                    row.redacts, row.relates_to,
+                    -token,
+                    row.event_id,
+                    row.room_id,
+                    row.type,
+                    row.state_key,
+                    row.redacts,
+                    row.relates_to,
                     backfilled=True,
                 )
         return super(SlavedEventStore, self).process_replication_rows(
@@ -103,41 +107,48 @@ class SlavedEventStore(EventFederationWorkerStore,
 
         if row.type == EventsStreamEventRow.TypeId:
             self.invalidate_caches_for_event(
-                token, data.event_id, data.room_id, data.type, data.state_key,
-                data.redacts, data.relates_to,
+                token,
+                data.event_id,
+                data.room_id,
+                data.type,
+                data.state_key,
+                data.redacts,
+                data.relates_to,
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
             if data.type == EventTypes.Member:
                 self.get_rooms_for_user_with_stream_ordering.invalidate(
-                    (data.state_key, ),
+                    (data.state_key,)
                 )
         else:
-            raise Exception("Unknown events stream row type %s" % (row.type, ))
-
-    def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
-                                    etype, state_key, redacts, relates_to,
-                                    backfilled):
+            raise Exception("Unknown events stream row type %s" % (row.type,))
+
+    def invalidate_caches_for_event(
+        self,
+        stream_ordering,
+        event_id,
+        room_id,
+        etype,
+        state_key,
+        redacts,
+        relates_to,
+        backfilled,
+    ):
         self._invalidate_get_event_cache(event_id)
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
-        self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-            (room_id,)
-        )
+        self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
 
         if not backfilled:
-            self._events_stream_cache.entity_has_changed(
-                room_id, stream_ordering
-            )
+            self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
 
         if redacts:
             self._invalidate_get_event_cache(redacts)
 
         if etype == EventTypes.Member:
-            self._membership_stream_cache.entity_has_changed(
-                state_key, stream_ordering
-            )
+            self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
             self.get_invited_rooms_for_user.invalidate((state_key,))
 
         if relates_to:
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index e933b170bb..28a46edd28 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -27,10 +27,11 @@ class SlavedGroupServerStore(BaseSlavedStore):
         self.hs = hs
 
         self._group_updates_id_gen = SlavedIdTracker(
-            db_conn, "local_group_updates", "stream_id",
+            db_conn, "local_group_updates", "stream_id"
         )
         self._group_updates_stream_cache = StreamChangeCache(
-            "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
+            "_group_updates_stream_cache",
+            self._group_updates_id_gen.get_current_token(),
         )
 
     get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
@@ -46,9 +47,7 @@ class SlavedGroupServerStore(BaseSlavedStore):
         if stream_name == "groups":
             self._group_updates_id_gen.advance(token)
             for row in rows:
-                self._group_updates_stream_cache.entity_has_changed(
-                    row.user_id, token
-                )
+                self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
 
         return super(SlavedGroupServerStore, self).process_replication_rows(
             stream_name, token, rows
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 0ec1db25ce..82d808af4c 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -24,9 +24,7 @@ from ._slaved_id_tracker import SlavedIdTracker
 class SlavedPresenceStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(SlavedPresenceStore, self).__init__(db_conn, hs)
-        self._presence_id_gen = SlavedIdTracker(
-            db_conn, "presence_stream", "stream_id",
-        )
+        self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")
 
         self._presence_on_startup = self._get_active_presence(db_conn)
 
@@ -55,9 +53,7 @@ class SlavedPresenceStore(BaseSlavedStore):
         if stream_name == "presence":
             self._presence_id_gen.advance(token)
             for row in rows:
-                self.presence_stream_cache.entity_has_changed(
-                    row.user_id, token
-                )
+                self.presence_stream_cache.entity_has_changed(row.user_id, token)
                 self._get_presence_for_user.invalidate((row.user_id,))
         return super(SlavedPresenceStore, self).process_replication_rows(
             stream_name, token, rows
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 45fc913c52..af7012702e 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -23,7 +23,7 @@ from .events import SlavedEventStore
 class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
     def __init__(self, db_conn, hs):
         self._push_rules_stream_id_gen = SlavedIdTracker(
-            db_conn, "push_rules_stream", "stream_id",
+            db_conn, "push_rules_stream", "stream_id"
         )
         super(SlavedPushRuleStore, self).__init__(db_conn, hs)
 
@@ -47,9 +47,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
             for row in rows:
                 self.get_push_rules_for_user.invalidate((row.user_id,))
                 self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
-                self.push_rules_stream_cache.entity_has_changed(
-                    row.user_id, token
-                )
+                self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
         return super(SlavedPushRuleStore, self).process_replication_rows(
             stream_name, token, rows
         )
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 3b2213c0d4..8eeb267d61 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -21,12 +21,10 @@ from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
-
     def __init__(self, db_conn, hs):
         super(SlavedPusherStore, self).__init__(db_conn, hs)
         self._pushers_id_gen = SlavedIdTracker(
-            db_conn, "pushers", "id",
-            extra_tables=[("deleted_pushers", "stream_id")],
+            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
         )
 
     def stream_positions(self):
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ed12342f40..91afa5a72b 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -29,7 +29,6 @@ from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
-
     def __init__(self, db_conn, hs):
         # We instantiate this first as the ReceiptsWorkerStore constructor
         # needs to be able to call get_max_receipt_stream_id
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 0cb474928c..f68b3378e3 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -38,6 +38,4 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
         if stream_name == "public_rooms":
             self._public_room_id_gen.advance(token)
 
-        return super(RoomStore, self).process_replication_rows(
-            stream_name, token, rows
-        )
+        return super(RoomStore, self).process_replication_rows(stream_name, token, rows)