diff options
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r-- | synapse/replication/slave/storage/events.py | 27 | ||||
-rw-r--r-- | synapse/replication/slave/storage/pushers.py | 52 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 61 |
3 files changed, 136 insertions, 4 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cfc728a038..86f00b6ff5 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage import DataStore from synapse.storage.room import RoomStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -68,7 +69,19 @@ class SlavedEventStore(BaseSlavedStore): _get_current_state_for_key = StateStore.__dict__[ "_get_current_state_for_key" ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ @@ -82,6 +95,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + _set_before_and_after = DataStore._set_before_and_after _get_events = DataStore._get_events.__func__ @@ -104,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = self._backfill_id_gen.get_current_token() + result["backfill"] = -self._backfill_id_gen.get_current_token() return result def process_replication(self, result): @@ -122,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("backfill") if stream: - self._backfill_id_gen.advance(stream["position"]) + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: self._process_replication_row( row, backfilled=True, state_resets=state_resets @@ -147,11 +161,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() @@ -163,6 +177,10 @@ class SlavedEventStore(BaseSlavedStore): self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering @@ -182,6 +200,7 @@ class SlavedEventStore(BaseSlavedStore): # self._membership_stream_cache.entity_has_changed( # event.state_key, event.internal_metadata.stream_ordering # ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): return diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py new file mode 100644 index 0000000000..8faddb2595 --- /dev/null +++ b/synapse/replication/slave/storage/pushers.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore + + +class SlavedPusherStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedPusherStore, self).__init__(db_conn, hs) + self._pushers_id_gen = SlavedIdTracker( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) + + get_all_pushers = DataStore.get_all_pushers.__func__ + get_pushers_by = DataStore.get_pushers_by.__func__ + get_pushers_by_app_id_and_pushkey = ( + DataStore.get_pushers_by_app_id_and_pushkey.__func__ + ) + _decode_pushers_rows = DataStore._decode_pushers_rows.__func__ + + def stream_positions(self): + result = super(SlavedPusherStore, self).stream_positions() + result["pushers"] = self._pushers_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + stream = result.get("deleted_pushers") + if stream: + self._pushers_id_gen.advance(stream["position"]) + + return super(SlavedPusherStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..b55d5dfd08 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id, receipt_type, user_id = row[1:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) |