diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-04-14 13:30:57 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-04-14 13:30:57 +0100 |
commit | f41b1a87237dd670d469aaed528fbd8e9b66c9d3 (patch) | |
tree | e305d52e355d4caea9dab83fa628bf4f68b29d1e | |
parent | Optionally split out the pusher into a separate process (diff) | |
download | synapse-f41b1a87237dd670d469aaed528fbd8e9b66c9d3.tar.xz |
Make push sort of work
-rw-r--r-- | synapse/app/pusher.py | 13 | ||||
-rw-r--r-- | synapse/replication/slave/storage/event_push_actions.py | 25 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 62 |
4 files changed, 96 insertions, 8 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f77deeed85..7755fbf00a 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -24,7 +24,9 @@ from synapse.config.logger import LoggingConfig from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.event_push_actions import SlavedPushActionsStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine +from synapse.storage import DataStore from synapse.util.async import sleep from synapse.util.logcontext import (LoggingContext, preserve_fn) @@ -40,7 +42,7 @@ class SlaveConfig(DatabaseConfig): def read_config(self, config): self.replication_url = config["replication_url"] self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = False + self.use_insecure_ssl_client_just_for_testing_do_not_use = True self.user_agent_suffix = None self.start_pushers = True @@ -58,9 +60,13 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig): class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedPushActionsStore + SlavedPushActionsStore, + SlavedEventStore, SlavedPusherStore, + SlavedReceiptsStore ): - pass + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) class PusherServer(HomeServer): @@ -135,7 +141,6 @@ class PusherServer(HomeServer): args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) - logger.error("FNARG %r", result) yield store.process_replication(result) poke_pushers(result) except: diff --git a/synapse/replication/slave/storage/event_push_actions.py b/synapse/replication/slave/storage/event_push_actions.py index 7ff1b8531a..26efe6daf6 100644 --- a/synapse/replication/slave/storage/event_push_actions.py +++ b/synapse/replication/slave/storage/event_push_actions.py @@ -14,12 +14,17 @@ # limitations under the License. -from ._base import BaseSlavedStore +from .events import SlavedEventStore +from .receipts import SlavedReceiptsStore from synapse.storage import DataStore +from synapse.storage.event_push_actions import EventPushActionsStore -class SlavedPushActionsStore(BaseSlavedStore): +class SlavedPushActionsStore(SlavedEventStore, SlavedReceiptsStore): + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -28,3 +33,19 @@ class SlavedPushActionsStore(BaseSlavedStore): get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ ) + + def invalidate_caches_for_event(self, event, backfilled, reset_state): + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + super(SlavedPushActionsStore, self).invalidate_caches_for_event( + event, backfilled, reset_state + ) + + def invalidate_caches_for_receipt(self, user_id, room_id): + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (room_id,) + ) + super(SlavedPushActionsStore, self).invalidate_caches_for_receipt( + user_id, room_id + ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 25802ed44f..82f171c257 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -151,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..aaf9015ebf --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id = row[1] + user_id = row[3] + self.invalidate_caches_for_receipt(user_id, room_id) + self.get_receipts_for_user.invalidate((user_id,)) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, user_id, room_id): + self.get_receipts_for_user.invalidate((user_id,)) |