diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/resource.py | 43 | ||||
-rw-r--r-- | synapse/replication/slave/storage/deviceinbox.py | 42 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 13 |
3 files changed, 72 insertions, 26 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 84993b33b3..1ed9034bcb 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -40,8 +40,8 @@ STREAM_NAMES = ( ("backfill",), ("push_rules",), ("pushers",), - ("state",), ("caches",), + ("to_device",), ) @@ -130,7 +130,6 @@ class ReplicationResource(Resource): backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() - state_token = self.store.get_state_stream_token() caches_token = self.store.get_cache_stream_token() defer.returnValue(_ReplicationToken( @@ -142,8 +141,9 @@ class ReplicationResource(Resource): backfill_token, push_rules_token, pushers_token, - state_token, + 0, # State stream is no longer a thing caches_token, + int(stream_token.to_device_key), )) @request_handler() @@ -191,8 +191,8 @@ class ReplicationResource(Resource): yield self.receipts(writer, current_token, limit, request_streams) yield self.push_rules(writer, current_token, limit, request_streams) yield self.pushers(writer, current_token, limit, request_streams) - yield self.state(writer, current_token, limit, request_streams) yield self.caches(writer, current_token, limit, request_streams) + yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.info("Replicated %d rows", writer.total) @@ -366,25 +366,6 @@ class ReplicationResource(Resource): )) @defer.inlineCallbacks - def state(self, writer, current_token, limit, request_streams): - current_position = current_token.state - - state = request_streams.get("state") - - if state is not None: - state_groups, state_group_state = ( - yield self.store.get_all_new_state_groups( - state, current_position, limit - ) - ) - writer.write_header_and_rows("state_groups", state_groups, ( - "position", "room_id", "event_id" - )) - writer.write_header_and_rows("state_group_state", state_group_state, ( - "position", "type", "state_key", "event_id" - )) - - @defer.inlineCallbacks def caches(self, writer, current_token, limit, request_streams): current_position = current_token.caches @@ -398,6 +379,20 @@ class ReplicationResource(Resource): "position", "cache_func", "keys", "invalidation_ts" )) + @defer.inlineCallbacks + def to_device(self, writer, current_token, limit, request_streams): + current_position = current_token.to_device + + to_device = request_streams.get("to_device") + + if to_device is not None: + to_device_rows = yield self.store.get_all_new_device_messages( + to_device, current_position, limit + ) + writer.write_header_and_rows("to_device", to_device_rows, ( + "position", "user_id", "device_id", "message_json" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -426,7 +421,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state", "caches", + "push_rules", "pushers", "state", "caches", "to_device", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py new file mode 100644 index 0000000000..64d8eb2af1 --- /dev/null +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore + + +class SlavedDeviceInboxStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) + self._device_inbox_id_gen = SlavedIdTracker( + db_conn, "device_inbox", "stream_id", + ) + + get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ + get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ + delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + + def stream_positions(self): + result = super(SlavedDeviceInboxStore, self).stream_positions() + result["to_device"] = self._device_inbox_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("to_device") + if stream: + self._device_inbox_id_gen.advance(int(stream["position"])) + + return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index f4f31f2d27..cbebd5b2f7 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -120,10 +120,21 @@ class SlavedEventStore(BaseSlavedStore): get_state_for_event = DataStore.get_state_for_event.__func__ get_state_for_events = DataStore.get_state_for_events.__func__ get_state_groups = DataStore.get_state_groups.__func__ + get_state_groups_ids = DataStore.get_state_groups_ids.__func__ + get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__ + get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__ + get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__ + get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__ + _get_joined_users_from_context = ( + RoomMemberStore.__dict__["_get_joined_users_from_context"] + ) + get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) + is_host_joined = DataStore.is_host_joined.__func__ + _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) @@ -211,7 +222,6 @@ class SlavedEventStore(BaseSlavedStore): self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() self.get_users_in_room.invalidate((event.room_id,)) - # self.get_joined_hosts_for_room.invalidate((event.room_id,)) self._invalidate_get_event_cache(event.event_id) @@ -235,7 +245,6 @@ class SlavedEventStore(BaseSlavedStore): if event.type == EventTypes.Member: self.get_rooms_for_user.invalidate((event.state_key,)) - # self.get_joined_hosts_for_room.invalidate((event.room_id,)) self.get_users_in_room.invalidate((event.room_id,)) self._membership_stream_cache.entity_has_changed( event.state_key, event.internal_metadata.stream_ordering |