diff options
Diffstat (limited to 'synapse')
24 files changed, 1005 insertions, 204 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e895b1c450..11605b34a3 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,9 +15,30 @@ class EventContext(object): + __slots__ = [ + "current_state_ids", + "prev_state_ids", + "state_group", + "rejected", + "push_actions", + "prev_group", + "delta_ids", + "prev_state_events", + ] + def __init__(self): + # The current state including the current event self.current_state_ids = None + # The current state excluding the current event self.prev_state_ids = None self.state_group = None + self.rejected = False self.push_actions = [] + + # A previously persisted state group and a delta between that + # and this state. + self.prev_group = None + self.delta_ids = None + + self.prev_state_events = None diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..78719eed25 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -138,6 +138,12 @@ class FederationClient(FederationBase): return defer.succeed(None) @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) return defer.succeed(None) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5621655098..3fa7b2315c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -188,7 +188,7 @@ class FederationServer(FederationBase): except SynapseError as e: logger.info("Failed to handle edu %r: %r", edu_type, e) except Exception as e: - logger.exception("Failed to handle edu %r", edu_type, e) + logger.exception("Failed to handle edu %r", edu_type) else: logger.warn("Received EDU of type %s with no handler", edu_type) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index cb2ef0210c..633c79c352 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .persistence import TransactionActions -from .units import Transaction +from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor @@ -81,6 +81,8 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + self.last_device_stream_id_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -155,6 +157,17 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def enqueue_device_messages(self, destination): + if destination == self.server_name or destination == "localhost": + return + + if not self.can_send_to(destination): + return + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): yield run_on_reactor() @@ -175,6 +188,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) @@ -184,13 +203,34 @@ class TransactionQueue(object): return yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) ) + @defer.inlineCallbacks + def _get_new_device_messages(self, destination): + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) + to_device_stream_id = self.store.get_to_device_stream_token() + contents, stream_id = yield self.store.get_new_device_msgs_for_remote( + destination, last_device_stream_id, to_device_stream_id + ) + edus = [ + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ] + defer.returnValue((edus, stream_id)) + @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + pending_failures, device_stream_id, + should_delete_from_device_stream): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -215,9 +255,9 @@ class TransactionQueue(object): "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", destination, txn_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures) + len(pdus), + len(edus), + len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -242,9 +282,9 @@ class TransactionQueue(object): " (PDUs: %d, EDUs: %d, failures: %d)", destination, txn_id, transaction.transaction_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures), + len(pdus), + len(edus), + len(failures), ) with limiter: @@ -299,6 +339,13 @@ class TransactionQueue(object): logger.info( "Failed to send event %s to %s", p.event_id, destination ) + else: + # Remove the acknowledged device messages from the database + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py new file mode 100644 index 0000000000..c5368e5df2 --- /dev/null +++ b/synapse/handlers/devicemessage.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.types import get_domain_from_id +from synapse.util.stringutils import random_string + + +logger = logging.getLogger(__name__) + + +class DeviceMessageHandler(object): + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.federation = hs.get_replication_layer() + + self.federation.register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + + @defer.inlineCallbacks + def on_direct_to_device_edu(self, origin, content): + local_messages = {} + sender_user_id = content["sender"] + if origin != get_domain_from_id(sender_user_id): + logger.warn( + "Dropping device message from %r with spoofed sender %r", + origin, sender_user_id + ) + message_type = content["type"] + message_id = content["message_id"] + for user_id, by_device in content["messages"].items(): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + + stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + origin, message_id, local_messages + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + @defer.inlineCallbacks + def send_device_message(self, sender_user_id, message_type, messages): + + local_messages = {} + remote_messages = {} + for user_id, by_device in messages.items(): + if self.is_mine_id(user_id): + messages_by_device = { + device_id: { + "content": message_content, + "type": message_type, + "sender": sender_user_id, + } + for device_id, message_content in by_device.items() + } + if messages_by_device: + local_messages[user_id] = messages_by_device + else: + destination = get_domain_from_id(user_id) + remote_messages.setdefault(destination, {})[user_id] = by_device + + message_id = random_string(16) + + remote_edu_contents = {} + for destination, messages in remote_messages.items(): + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + } + + stream_id = yield self.store.add_messages_to_device_inbox( + local_messages, remote_edu_contents + ) + + self.notifier.on_new_event( + "to_device_key", stream_id, users=local_messages.keys() + ) + + for destination in remote_messages.keys(): + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation.send_device_messages(destination) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dc90a5dde4..8a1038c44a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -832,11 +832,13 @@ class FederationHandler(BaseHandler): new_pdu = event - message_handler = self.hs.get_handlers().message_handler - destinations = yield message_handler.get_joined_hosts_for_room_from_state( - context + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = set( + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) ) - destinations = set(destinations) + destinations.discard(origin) logger.debug( @@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler): new_pdu = event - message_handler = self.hs.get_handlers().message_handler - destinations = yield message_handler.get_joined_hosts_for_room_from_state( - context + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = set( + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) ) - destinations = set(destinations) destinations.discard(origin) logger.debug( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3577db0595..178209a209 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.metrics import measure_func -from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -945,7 +944,12 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - destinations = yield self.get_joined_hosts_for_room_from_state(context) + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = [ + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) + ] @defer.inlineCallbacks def _notify(): @@ -963,39 +967,3 @@ class MessageHandler(BaseHandler): preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) - - def get_joined_hosts_for_room_from_state(self, context): - state_group = context.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - return self._get_joined_hosts_for_room_from_state( - state_group, context.current_state_ids - ) - - @cachedInlineCallbacks(num_args=1, cache_context=True) - def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids, - cache_context): - - # Don't bother getting state for people on the same HS - current_state = yield self.store.get_events([ - e_id for key, e_id in current_state_ids.items() - if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1]) - ]) - - destinations = set() - for e in current_state.itervalues(): - try: - if e.type == EventTypes.Member: - if e.content["membership"] == Membership.JOIN: - destinations.add(get_domain_from_id(e.state_key)) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", e.event_id - ) - - defer.returnValue(destinations) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cf82a2336e..7a3c16a8aa 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,11 @@ bump_active_time_counter = metrics.register_counter("bump_active_time") get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) +state_transition_counter = metrics.register_counter( + "state_transition", labels=["from", "to"] +) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" @@ -646,6 +651,13 @@ class PresenceHandler(object): ) continue + if get_domain_from_id(user_id) != origin: + logger.info( + "Got presence update from %r with bad 'user_id': %r", + origin, user_id, + ) + continue + presence_state = push.get("presence", None) if not presence_state: logger.info( @@ -939,27 +951,32 @@ class PresenceHandler(object): def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. """ + if old_state == new_state: + return False + if old_state.status_msg != new_state.status_msg: + notify_reason_counter.inc("status_msg_change") return True - if old_state.state == PresenceState.ONLINE: - if new_state.state != PresenceState.ONLINE: - # Always notify for online -> anything - return True + if old_state.state != new_state.state: + notify_reason_counter.inc("state_change") + state_transition_counter.inc(old_state.state, new_state.state) + return True + if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: + notify_reason_counter.inc("current_active_change") return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive - if not (old_state.currently_active and new_state.currently_active): + if not new_state.currently_active: + notify_reason_counter.inc("last_active_change_online") return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - return True - - if old_state.state != new_state.state: + notify_reason_counter.inc("last_active_change_not_online") return True return False diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0b530b9034..3b687957dd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -199,7 +199,14 @@ class TypingHandler(object): user_id = content["user_id"] # Check that the string is a valid user id - UserID.from_string(user_id) + user = UserID.from_string(user_id) + + if user.domain != origin: + logger.info( + "Got typing update from %r with bad 'user_id': %r", + origin, user_id, + ) + return users = yield self.state.get_current_user_in_room(room_id) domains = set(get_domain_from_id(u) for u in users) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 64d8eb2af1..251078ba57 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(BaseSlavedStore): @@ -24,6 +25,10 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen = SlavedIdTracker( db_conn, "device_inbox", "stream_id", ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() + ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ @@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore): stream = result.get("to_device") if stream: self._device_inbox_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + stream_id = row[0] + user_id = row[1] + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cbebd5b2f7..15c52774a2 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore): _get_state_groups_from_groups = ( StateStore.__dict__["_get_state_groups_from_groups"] ) + _get_state_groups_from_groups_txn = ( + DataStore._get_state_groups_from_groups_txn.__func__ + ) _get_state_group_from_group = ( StateStore.__dict__["_get_state_group_from_group"] ) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 9c10a99acf..5975164b37 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -16,10 +16,11 @@ import logging from twisted.internet import defer -from synapse.http.servlet import parse_json_object_from_request from synapse.http import servlet +from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.v1.transactions import HttpTransactionStore + from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -39,10 +40,8 @@ class SendToDeviceRestServlet(servlet.RestServlet): super(SendToDeviceRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id self.txns = HttpTransactionStore() + self.device_message_handler = hs.get_device_message_handler() @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): @@ -57,28 +56,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): content = parse_json_object_from_request(request) - # TODO: Prod the notifier to wake up sync streams. - # TODO: Implement replication for the messages. - # TODO: Send the messages to remote servers if needed. - - local_messages = {} - for user_id, by_device in content["messages"].items(): - if self.is_mine_id(user_id): - messages_by_device = { - device_id: { - "content": message_content, - "type": message_type, - "sender": requester.user.to_string(), - } - for device_id, message_content in by_device.items() - } - if messages_by_device: - local_messages[user_id] = messages_by_device - - stream_id = yield self.store.add_messages_to_device_inbox(local_messages) - - self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + sender_user_id = requester.user.to_string() + + yield self.device_message_handler.send_device_message( + sender_user_id, message_type, content["messages"] ) response = (200, {}) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 9f0625a822..a45ee9483e 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -45,7 +45,14 @@ class DownloadResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): - request.setHeader("Content-Security-Policy", "sandbox") + request.setHeader( + "Content-Security-Policy", + "default-src 'none';" + " script-src 'none';" + " plugin-types application/pdf;" + " style-src 'unsafe-inline';" + " object-src 'self';" + ) server_name, media_id, name = parse_media_id(request) if server_name == self.server_name: yield self._respond_local_file(request, media_id, name) diff --git a/synapse/server.py b/synapse/server.py index af3246504b..f516f08167 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -35,6 +35,7 @@ from synapse.federation import initialize_http_replication from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler +from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler @@ -100,6 +101,7 @@ class HomeServer(object): 'application_service_api', 'application_service_scheduler', 'application_service_handler', + 'device_message_handler', 'notifier', 'distributor', 'client_resource', @@ -205,6 +207,9 @@ class HomeServer(object): def build_device_handler(self): return DeviceHandler(self) + def build_device_message_handler(self): + return DeviceMessageHandler(self) + def build_e2e_keys_handler(self): return E2eKeysHandler(self) diff --git a/synapse/state.py b/synapse/state.py index cd792afed1..4520fa0415 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -55,12 +55,15 @@ def _gen_state_id(): class _StateCacheEntry(object): - __slots__ = ["state", "state_group", "state_id"] + __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] - def __init__(self, state, state_group): + def __init__(self, state, state_group, prev_group=None, delta_ids=None): self.state = state self.state_group = state_group + self.prev_group = prev_group + self.delta_ids = delta_ids + # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the # state group, but on worker instances we can't generate a new state @@ -245,11 +248,20 @@ class StateHandler(object): if key in context.prev_state_ids: replaces = context.prev_state_ids[key] event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) context.current_state_ids[key] = event.event_id + + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + if context.delta_ids is not None: + context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + context.prev_state_events = [] defer.returnValue(context) @@ -283,6 +295,8 @@ class StateHandler(object): defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, + prev_group=name, + delta_ids={}, )) with (yield self.resolve_linearizer.queue(group_names)): @@ -340,9 +354,24 @@ class StateHandler(object): if hasattr(self.store, "get_next_state_group"): state_group = self.store.get_next_state_group() + prev_group = None + delta_ids = None + for old_group, old_ids in state_groups_ids.items(): + if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + n_delta_ids = { + k: v + for k, v in new_state.items() + if old_ids.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + cache = _StateCacheEntry( state=new_state, state_group=state_group, + prev_group=prev_group, + delta_ids=delta_ids, ) if self._state_cache is not None: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..828e5ca60b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 68116b0394..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -27,63 +27,139 @@ logger = logging.getLogger(__name__) class DeviceInboxStore(SQLBaseStore): @defer.inlineCallbacks - def add_messages_to_device_inbox(self, messages_by_user_then_device): - """ + def add_messages_to_device_inbox(self, local_messages_by_user_then_device, + remote_messages_by_destination): + """Used to send messages from this server. + Args: - messages_by_user_and_device(dict): + sender_user_id(str): The ID of the user sending these messages. + local_messages_by_user_and_device(dict): Dictionary of user_id to device_id to message. + remote_messages_by_destination(dict): + Dictionary of destination server_name to the EDU JSON to send. Returns: A deferred stream_id that resolves when the messages have been inserted. """ - def select_devices_txn(txn, user_id, devices): - if not devices: - return [] - sql = ( - "SELECT user_id, device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" + def add_messages_txn(txn, now_ms, stream_id): + # Add the local messages directly to the local inbox. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device ) - # TODO: Maybe this needs to be done in batches if there are - # too many local devices for a given user. - args = [user_id] + devices - txn.execute(sql, args) - return [tuple(row) for row in txn.fetchall()] - - def add_messages_to_device_inbox_txn(txn, stream_id): - local_users_and_devices = set() - for user_id, messages_by_device in messages_by_user_then_device.items(): - local_users_and_devices.update( - select_devices_txn(txn, user_id, messages_by_device.keys()) - ) + # Add the remote messages to the federation outbox. + # We'll send them to a remote server when we next send a + # federation transaction to that destination. sql = ( - "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" + "INSERT INTO device_federation_outbox" + " (destination, stream_id, queued_ts, messages_json)" " VALUES (?,?,?,?)" ) rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) - + for destination, edu in remote_messages_by_destination.items(): + edu_json = ujson.dumps(edu) + rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_to_device_inbox", - add_messages_to_device_inbox_txn, - stream_id + add_messages_txn, + now_ms, + stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) + @defer.inlineCallbacks + def add_messages_from_remote_to_device_inbox( + self, origin, message_id, local_messages_by_user_then_device + ): + def add_messages_txn(txn, now_ms, stream_id): + # Check if we've already inserted a matching message_id for that + # origin. This can happen if the origin doesn't receive our + # acknowledgement from the first time we received the message. + already_inserted = self._simple_select_one_txn( + txn, table="device_federation_inbox", + keyvalues={"origin": origin, "message_id": message_id}, + retcols=("message_id",), + allow_none=True, + ) + if already_inserted is not None: + return + + # Add an entry for this message_id so that we know we've processed + # it. + self._simple_insert_txn( + txn, table="device_federation_inbox", + values={ + "origin": origin, + "message_id": message_id, + "received_ts": now_ms, + }, + ) + + # Add the messages to the approriate local device inboxes so that + # they'll be sent to the devices when they next sync. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_msec() + yield self.runInteraction( + "add_messages_from_remote_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, + ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + + def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): + local_users_and_devices = set() + for user_id, messages_by_device in messages_by_user_then_device.items(): + devices = messages_by_device.keys() + sql = ( + "SELECT user_id, device_id FROM devices" + " WHERE user_id = ? AND device_id IN (" + + ",".join("?" * len(devices)) + + ")" + ) + # TODO: Maybe this needs to be done in batches if there are + # too many local devices for a given user. + txn.execute(sql, [user_id] + devices) + local_users_and_devices.update(map(tuple, txn.fetchall())) + + sql = ( + "INSERT INTO device_inbox" + " (user_id, device_id, stream_id, message_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for user_id, messages_by_device in messages_by_user_then_device.items(): + for device_id, message in messages_by_device.items(): + message_json = ujson.dumps(message) + # Only insert into the local inbox if the device exists on + # this server + if (user_id, device_id) in local_users_and_devices: + rows.append((user_id, device_id, stream_id, message_json)) + + txn.executemany(sql, rows) + def get_new_messages_for_device( self, user_id, device_id, last_stream_id, current_stream_id, limit=100 ): @@ -97,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -182,3 +264,71 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + + def get_new_device_msgs_for_remote( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int): The last position of the device message stream + that the server sent up to. + current_stream_id(int): The current position of the device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn.fetchall(): + stream_pos = row[0] + messages.append(ujson.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_msgs_for_remote", + get_new_messages_for_remote_destination_txn, + ) + + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_msgs_for_remote", + delete_messages_for_remote_destination_txn + ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index afd6530cab..17920d4480 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore): or_ignore=ignore_if_known, ) except Exception as e: - logger.error("store_device with device_id=%s failed: %s", - device_id, e) + logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" + " display_name=%s(%r) failed: %s", + type(device_id).__name__, device_id, + type(user_id).__name__, user_id, + type(initial_device_display_name).__name__, + initial_device_display_name, e) raise StoreError(500, "Problem storing device.") def get_device(self, user_id, device_id): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1a7d4c5199..ed182c8d11 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore): # insert into the state_group, state_groups_state and # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, ((event, context),)) + try: + self._store_mult_state_groups_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise metadata_json = encode_json( event.internal_metadata.get_dict() @@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + for event_id, state_key in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -1582,7 +1589,66 @@ class EventsStore(SQLBaseStore): " GROUP BY state_group HAVING MAX(topological_ordering) < ?", (room_id, topological_ordering, topological_ordering) ) + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state[new_state_edge] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b94ce7bea1..7efbe51cda 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 34 +SCHEMA_VERSION = 35 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module = imp.load_source( module_name, absolute_path, python_file ) - logger.debug("Running script %s", relative_path) + logger.info("Running script %s", relative_path) module.run_create(cur, database_engine) if not is_empty: module.run_upgrade(cur, database_engine, config=config) @@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, pass elif ext == ".sql": # A plain old .sql file, just read and execute it - logger.debug("Applying schema %s", relative_path) + logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) else: # Not a valid delta file. diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql new file mode 100644 index 0000000000..e87066d9a1 --- /dev/null +++ b/synapse/storage/schema/delta/34/device_outbox.sql @@ -0,0 +1,36 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + + +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql new file mode 100644 index 0000000000..0f1fa68a89 --- /dev/null +++ b/synapse/storage/schema/delta/35/state.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE state_group_edges( + state_group BIGINT NOT NULL, + prev_state_group BIGINT NOT NULL +); + +CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql new file mode 100644 index 0000000000..97e5067ef4 --- /dev/null +++ b/synapse/storage/schema/delta/35/state_dedupe.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('state_group_state_deduplication', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec551b0b4f..fef87834ca 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -24,6 +25,9 @@ import logging logger = logging.getLogger(__name__) +MAX_STATE_DELTA_HOPS = 100 + + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -43,6 +47,15 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + + def __init__(self, hs): + super(StateStore, self).__init__(hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: @@ -103,11 +116,8 @@ class StateStore(SQLBaseStore): state_groups[event.event_id] = context.state_group if self._have_persisted_state_group_txn(txn, context.state_group): - logger.info("Already persisted state_group: %r", context.state_group) continue - state_event_ids = dict(context.current_state_ids) - self._simple_insert_txn( txn, table="state_groups", @@ -118,20 +128,51 @@ class StateStore(SQLBaseStore): }, ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. + if context.prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group + ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in state_event_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.items() + ], + ) self._simple_insert_many_txn( txn, @@ -145,6 +186,47 @@ class StateStore(SQLBaseStore): ], ) + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): if event_type and state_key is not None: @@ -206,48 +288,108 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ - def f(txn, groups): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - - sql = ( - "SELECT state_group, event_id, type, state_key" - " FROM state_groups_state WHERE" - " state_group IN (%s) %s" % ( - ",".join("?" for _ in groups), - where_clause, - ) - ) - - args = list(groups) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - - results = {group: {} for group in groups} - for row in rows: - key = (row["type"], row["state_key"]) - results[row["state_group"]][key] = row["event_id"] - return results - results = {} chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - f, chunk + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) + def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + results = {group: {} for group in groups} + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state + ) + %s; + """) % (where_clause,) + + for group in groups: + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + + return results + @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state @@ -504,32 +646,150 @@ class StateStore(SQLBaseStore): defer.returnValue(results) - def get_all_new_state_groups(self, last_id, current_id, limit): - def get_all_new_state_groups_txn(txn): - sql = ( - "SELECT id, room_id, event_id FROM state_groups" - " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?" + def get_next_state_group(self): + return self._state_groups_id_gen.get_next() + + @defer.inlineCallbacks + def _background_deduplicate_state(self, progress, batch_size): + """This background update will slowly deduplicate state by reencoding + them as deltas. + """ + last_state_group = progress.get("last_state_group", 0) + rows_inserted = progress.get("rows_inserted", 0) + max_group = progress.get("max_group", None) + + BATCH_SIZE_SCALE_FACTOR = 100 + + batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR)) + + if max_group is None: + rows = yield self._execute( + "_background_deduplicate_state", None, + "SELECT coalesce(max(id), 0) FROM state_groups", ) - txn.execute(sql, (last_id, current_id, limit)) - groups = txn.fetchall() + max_group = rows[0][0] + + def reindex_txn(txn): + new_last_state_group = last_state_group + for count in xrange(batch_size): + txn.execute( + "SELECT id, room_id FROM state_groups" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC" + " LIMIT 1", + (new_last_state_group, max_group,) + ) + row = txn.fetchone() + if row: + state_group, room_id = row - if not groups: - return ([], []) + if not row or not state_group: + return True, count - lower_bound = groups[0][0] - upper_bound = groups[-1][0] - sql = ( - "SELECT state_group, type, state_key, event_id" - " FROM state_groups_state" - " WHERE ? <= state_group AND state_group <= ?" + txn.execute( + "SELECT state_group FROM state_group_edges" + " WHERE state_group = ?", + (state_group,) + ) + + # If we reach a point where we've already started inserting + # edges we should stop. + if txn.fetchall(): + return True, count + + txn.execute( + "SELECT coalesce(max(id), 0) FROM state_groups" + " WHERE id < ? AND room_id = ?", + (state_group, room_id,) + ) + prev_group, = txn.fetchone() + new_last_state_group = state_group + + if prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, prev_group + ) + if potential_hops >= MAX_STATE_DELTA_HOPS: + # We want to ensure chains are at most this long,# + # otherwise read performance degrades. + continue + + prev_state = self._get_state_groups_from_groups_txn( + txn, [prev_group], types=None + ) + prev_state = prev_state[prev_group] + + curr_state = self._get_state_groups_from_groups_txn( + txn, [state_group], types=None + ) + curr_state = curr_state[state_group] + + if not set(prev_state.keys()) - set(curr_state.keys()): + # We can only do a delta if the current has a strict super set + # of keys + + delta_state = { + key: value for key, value in curr_state.items() + if prev_state.get(key, None) != value + } + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": state_group, + "prev_state_group": prev_group, + } + ) + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": state_group, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in delta_state.items() + ], + ) + + progress = { + "last_state_group": state_group, + "rows_inserted": rows_inserted + batch_size, + "max_group": max_group, + } + + self._background_update_progress_txn( + txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress ) - txn.execute(sql, (lower_bound, upper_bound)) - state_group_state = txn.fetchall() - return (groups, state_group_state) - return self.runInteraction( - "get_all_new_state_groups", get_all_new_state_groups_txn + return False, batch_size + + finished, result = yield self.runInteraction( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn ) - def get_next_state_group(self): - return self._state_groups_id_gen.get_next() + if finished: + yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) + + defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) |