diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 817d1f67f9..182cb2a1d8 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -37,7 +37,7 @@ class BaseSlavedStore(SQLBaseStore):
super(BaseSlavedStore, self).__init__(db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
- db_conn, "cache_invalidation_stream", "stream_id",
+ db_conn, "cache_invalidation_stream", "stream_id"
)
else:
self._cache_id_gen = None
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index d9ba6d69b1..3c44d1d48d 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -21,10 +21,9 @@ from synapse.storage.tags import TagsWorkerStore
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
-
def __init__(self, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
- db_conn, "account_data_max_stream_id", "stream_id",
+ db_conn, "account_data_max_stream_id", "stream_id"
)
super(SlavedAccountDataStore, self).__init__(db_conn, hs)
@@ -45,24 +44,20 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
self._account_data_id_gen.advance(token)
for row in rows:
self.get_tags_for_user.invalidate((row.user_id,))
- self._account_data_stream_cache.entity_has_changed(
- row.user_id, token
- )
+ self._account_data_stream_cache.entity_has_changed(row.user_id, token)
elif stream_name == "account_data":
self._account_data_id_gen.advance(token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
- (row.data_type, row.user_id,)
+ (row.data_type, row.user_id)
)
self.get_account_data_for_user.invalidate((row.user_id,))
- self.get_account_data_for_room.invalidate((row.user_id, row.room_id,))
+ self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate(
- (row.user_id, row.room_id, row.data_type,),
- )
- self._account_data_stream_cache.entity_has_changed(
- row.user_id, token
+ (row.user_id, row.room_id, row.data_type)
)
+ self._account_data_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedAccountDataStore, self).process_replication_rows(
stream_name, token, rows
)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index b53a4c6bd1..cda12ea70d 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -20,6 +20,7 @@ from synapse.storage.appservice import (
)
-class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
- ApplicationServiceWorkerStore):
+class SlavedApplicationServiceStore(
+ ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore
+):
pass
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 5b8521c770..14ced32333 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -25,9 +25,7 @@ class SlavedClientIpStore(BaseSlavedStore):
super(SlavedClientIpStore, self).__init__(db_conn, hs)
self.client_ip_last_seen = Cache(
- name="client_ip_last_seen",
- keylen=4,
- max_entries=50000 * CACHE_SIZE_FACTOR,
+ name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
)
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 4d59778863..284fd30d89 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -24,15 +24,15 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
- db_conn, "device_max_stream_id", "stream_id",
+ db_conn, "device_max_stream_id", "stream_id"
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
- self._device_inbox_id_gen.get_current_token()
+ self._device_inbox_id_gen.get_current_token(),
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
- self._device_inbox_id_gen.get_current_token()
+ self._device_inbox_id_gen.get_current_token(),
)
self._last_device_delete_cache = ExpiringCache(
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 16c9a162c5..d9300fce33 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -27,14 +27,14 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
self.hs = hs
self._device_list_id_gen = SlavedIdTracker(
- db_conn, "device_lists_stream", "stream_id",
+ db_conn, "device_lists_stream", "stream_id"
)
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
- "DeviceListStreamChangeCache", device_list_max,
+ "DeviceListStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
- "DeviceListFederationStreamChangeCache", device_list_max,
+ "DeviceListFederationStreamChangeCache", device_list_max
)
def stream_positions(self):
@@ -46,17 +46,13 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
if stream_name == "device_lists":
self._device_list_id_gen.advance(token)
for row in rows:
- self._invalidate_caches_for_devices(
- token, row.user_id, row.destination,
- )
+ self._invalidate_caches_for_devices(token, row.user_id, row.destination)
return super(SlavedDeviceStore, self).process_replication_rows(
stream_name, token, rows
)
def _invalidate_caches_for_devices(self, token, user_id, destination):
- self._device_list_stream_cache.entity_has_changed(
- user_id, token
- )
+ self._device_list_stream_cache.entity_has_changed(user_id, token)
if destination:
self._device_list_federation_stream_cache.entity_has_changed(
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a3952506c1..ab5937e638 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -45,21 +45,20 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(EventFederationWorkerStore,
- RoomMemberWorkerStore,
- EventPushActionsWorkerStore,
- StreamWorkerStore,
- StateGroupWorkerStore,
- EventsWorkerStore,
- SignatureWorkerStore,
- UserErasureWorkerStore,
- RelationsWorkerStore,
- BaseSlavedStore):
-
+class SlavedEventStore(
+ EventFederationWorkerStore,
+ RoomMemberWorkerStore,
+ EventPushActionsWorkerStore,
+ StreamWorkerStore,
+ StateGroupWorkerStore,
+ EventsWorkerStore,
+ SignatureWorkerStore,
+ UserErasureWorkerStore,
+ RelationsWorkerStore,
+ BaseSlavedStore,
+):
def __init__(self, db_conn, hs):
- self._stream_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering",
- )
+ self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
@@ -90,8 +89,13 @@ class SlavedEventStore(EventFederationWorkerStore,
self._backfill_id_gen.advance(-token)
for row in rows:
self.invalidate_caches_for_event(
- -token, row.event_id, row.room_id, row.type, row.state_key,
- row.redacts, row.relates_to,
+ -token,
+ row.event_id,
+ row.room_id,
+ row.type,
+ row.state_key,
+ row.redacts,
+ row.relates_to,
backfilled=True,
)
return super(SlavedEventStore, self).process_replication_rows(
@@ -103,41 +107,48 @@ class SlavedEventStore(EventFederationWorkerStore,
if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
- token, data.event_id, data.room_id, data.type, data.state_key,
- data.redacts, data.relates_to,
+ token,
+ data.event_id,
+ data.room_id,
+ data.type,
+ data.state_key,
+ data.redacts,
+ data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
- (data.state_key, ),
+ (data.state_key,)
)
else:
- raise Exception("Unknown events stream row type %s" % (row.type, ))
-
- def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
- etype, state_key, redacts, relates_to,
- backfilled):
+ raise Exception("Unknown events stream row type %s" % (row.type,))
+
+ def invalidate_caches_for_event(
+ self,
+ stream_ordering,
+ event_id,
+ room_id,
+ etype,
+ state_key,
+ redacts,
+ relates_to,
+ backfilled,
+ ):
self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((room_id,))
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
- (room_id,)
- )
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
if not backfilled:
- self._events_stream_cache.entity_has_changed(
- room_id, stream_ordering
- )
+ self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
if redacts:
self._invalidate_get_event_cache(redacts)
if etype == EventTypes.Member:
- self._membership_stream_cache.entity_has_changed(
- state_key, stream_ordering
- )
+ self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_user.invalidate((state_key,))
if relates_to:
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index e933b170bb..28a46edd28 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -27,10 +27,11 @@ class SlavedGroupServerStore(BaseSlavedStore):
self.hs = hs
self._group_updates_id_gen = SlavedIdTracker(
- db_conn, "local_group_updates", "stream_id",
+ db_conn, "local_group_updates", "stream_id"
)
self._group_updates_stream_cache = StreamChangeCache(
- "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
+ "_group_updates_stream_cache",
+ self._group_updates_id_gen.get_current_token(),
)
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
@@ -46,9 +47,7 @@ class SlavedGroupServerStore(BaseSlavedStore):
if stream_name == "groups":
self._group_updates_id_gen.advance(token)
for row in rows:
- self._group_updates_stream_cache.entity_has_changed(
- row.user_id, token
- )
+ self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedGroupServerStore, self).process_replication_rows(
stream_name, token, rows
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 0ec1db25ce..82d808af4c 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -24,9 +24,7 @@ from ._slaved_id_tracker import SlavedIdTracker
class SlavedPresenceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPresenceStore, self).__init__(db_conn, hs)
- self._presence_id_gen = SlavedIdTracker(
- db_conn, "presence_stream", "stream_id",
- )
+ self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")
self._presence_on_startup = self._get_active_presence(db_conn)
@@ -55,9 +53,7 @@ class SlavedPresenceStore(BaseSlavedStore):
if stream_name == "presence":
self._presence_id_gen.advance(token)
for row in rows:
- self.presence_stream_cache.entity_has_changed(
- row.user_id, token
- )
+ self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
return super(SlavedPresenceStore, self).process_replication_rows(
stream_name, token, rows
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 45fc913c52..af7012702e 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -23,7 +23,7 @@ from .events import SlavedEventStore
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def __init__(self, db_conn, hs):
self._push_rules_stream_id_gen = SlavedIdTracker(
- db_conn, "push_rules_stream", "stream_id",
+ db_conn, "push_rules_stream", "stream_id"
)
super(SlavedPushRuleStore, self).__init__(db_conn, hs)
@@ -47,9 +47,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
- self.push_rules_stream_cache.entity_has_changed(
- row.user_id, token
- )
+ self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedPushRuleStore, self).process_replication_rows(
stream_name, token, rows
)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 3b2213c0d4..8eeb267d61 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -21,12 +21,10 @@ from ._slaved_id_tracker import SlavedIdTracker
class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
-
def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
self._pushers_id_gen = SlavedIdTracker(
- db_conn, "pushers", "id",
- extra_tables=[("deleted_pushers", "stream_id")],
+ db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
def stream_positions(self):
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index ed12342f40..91afa5a72b 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -29,7 +29,6 @@ from ._slaved_id_tracker import SlavedIdTracker
class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
-
def __init__(self, db_conn, hs):
# We instantiate this first as the ReceiptsWorkerStore constructor
# needs to be able to call get_max_receipt_stream_id
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 0cb474928c..f68b3378e3 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -38,6 +38,4 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
if stream_name == "public_rooms":
self._public_room_id_gen.advance(token)
- return super(RoomStore, self).process_replication_rows(
- stream_name, token, rows
- )
+ return super(RoomStore, self).process_replication_rows(stream_name, token, rows)
|