diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e895b1c450..11605b34a3 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,9 +15,30 @@
class EventContext(object):
+ __slots__ = [
+ "current_state_ids",
+ "prev_state_ids",
+ "state_group",
+ "rejected",
+ "push_actions",
+ "prev_group",
+ "delta_ids",
+ "prev_state_events",
+ ]
+
def __init__(self):
+ # The current state including the current event
self.current_state_ids = None
+ # The current state excluding the current event
self.prev_state_ids = None
self.state_group = None
+
self.rejected = False
self.push_actions = []
+
+ # A previously persisted state group and a delta between that
+ # and this state.
+ self.prev_group = None
+ self.delta_ids = None
+
+ self.prev_state_events = None
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 627acc6a4f..78719eed25 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -138,6 +138,12 @@ class FederationClient(FederationBase):
return defer.succeed(None)
@log_function
+ def send_device_messages(self, destination):
+ """Sends the device messages in the local database to the remote
+ destination"""
+ self._transaction_queue.enqueue_device_messages(destination)
+
+ @log_function
def send_failure(self, failure, destination):
self._transaction_queue.enqueue_failure(failure, destination)
return defer.succeed(None)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5621655098..3fa7b2315c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -188,7 +188,7 @@ class FederationServer(FederationBase):
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
- logger.exception("Failed to handle edu %r", edu_type, e)
+ logger.exception("Failed to handle edu %r", edu_type)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index cb2ef0210c..5c7245d383 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from .persistence import TransactionActions
-from .units import Transaction
+from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
@@ -81,6 +81,8 @@ class TransactionQueue(object):
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
+ self.last_device_stream_id_by_dest = {}
+
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@@ -155,6 +157,17 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
+ def enqueue_device_messages(self, destination):
+ if destination == self.server_name or destination == "localhost":
+ return
+
+ if not self.can_send_to(destination):
+ return
+
+ preserve_context_over_fn(
+ self._attempt_new_transaction, destination
+ )
+
@defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
yield run_on_reactor()
@@ -175,22 +188,50 @@ class TransactionQueue(object):
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ device_message_edus, device_stream_id = (
+ yield self._get_new_device_messages(destination)
+ )
+
+ pending_edus.extend(device_message_edus)
+
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
+ self.last_device_stream_id_by_dest[destination] = device_stream_id
return
yield self._send_new_transaction(
- destination, pending_pdus, pending_edus, pending_failures
+ destination, pending_pdus, pending_edus, pending_failures,
+ device_stream_id,
+ should_delete_from_device_stream=bool(device_message_edus)
)
+ @defer.inlineCallbacks
+ def _get_new_device_messages(self, destination):
+ last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
+ to_device_stream_id = self.store.get_to_device_stream_token()
+ contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
+ destination, last_device_stream_id, to_device_stream_id
+ )
+ edus = [
+ Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type="m.direct_to_device",
+ content=content,
+ )
+ for content in contents
+ ]
+ defer.returnValue((edus, stream_id))
+
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
- pending_failures):
+ pending_failures, device_stream_id,
+ should_delete_from_device_stream):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -215,9 +256,9 @@ class TransactionQueue(object):
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures)
+ len(pdus),
+ len(edus),
+ len(failures)
)
logger.debug("TX [%s] Persisting transaction...", destination)
@@ -242,9 +283,9 @@ class TransactionQueue(object):
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures),
+ len(pdus),
+ len(edus),
+ len(failures),
)
with limiter:
@@ -299,6 +340,13 @@ class TransactionQueue(object):
logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
+ else:
+ # Remove the acknowledged device messages from the database
+ if should_delete_from_device_stream:
+ yield self.store.delete_device_msgs_for_remote(
+ destination, device_stream_id
+ )
+ self.last_device_stream_id_by_dest[destination] = device_stream_id
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
new file mode 100644
index 0000000000..c5368e5df2
--- /dev/null
+++ b/synapse/handlers/devicemessage.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.types import get_domain_from_id
+from synapse.util.stringutils import random_string
+
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceMessageHandler(object):
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ self.store = hs.get_datastore()
+ self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
+ self.federation = hs.get_replication_layer()
+
+ self.federation.register_edu_handler(
+ "m.direct_to_device", self.on_direct_to_device_edu
+ )
+
+ @defer.inlineCallbacks
+ def on_direct_to_device_edu(self, origin, content):
+ local_messages = {}
+ sender_user_id = content["sender"]
+ if origin != get_domain_from_id(sender_user_id):
+ logger.warn(
+ "Dropping device message from %r with spoofed sender %r",
+ origin, sender_user_id
+ )
+ message_type = content["type"]
+ message_id = content["message_id"]
+ for user_id, by_device in content["messages"].items():
+ messages_by_device = {
+ device_id: {
+ "content": message_content,
+ "type": message_type,
+ "sender": sender_user_id,
+ }
+ for device_id, message_content in by_device.items()
+ }
+ if messages_by_device:
+ local_messages[user_id] = messages_by_device
+
+ stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+ origin, message_id, local_messages
+ )
+
+ self.notifier.on_new_event(
+ "to_device_key", stream_id, users=local_messages.keys()
+ )
+
+ @defer.inlineCallbacks
+ def send_device_message(self, sender_user_id, message_type, messages):
+
+ local_messages = {}
+ remote_messages = {}
+ for user_id, by_device in messages.items():
+ if self.is_mine_id(user_id):
+ messages_by_device = {
+ device_id: {
+ "content": message_content,
+ "type": message_type,
+ "sender": sender_user_id,
+ }
+ for device_id, message_content in by_device.items()
+ }
+ if messages_by_device:
+ local_messages[user_id] = messages_by_device
+ else:
+ destination = get_domain_from_id(user_id)
+ remote_messages.setdefault(destination, {})[user_id] = by_device
+
+ message_id = random_string(16)
+
+ remote_edu_contents = {}
+ for destination, messages in remote_messages.items():
+ remote_edu_contents[destination] = {
+ "messages": messages,
+ "sender": sender_user_id,
+ "type": message_type,
+ "message_id": message_id,
+ }
+
+ stream_id = yield self.store.add_messages_to_device_inbox(
+ local_messages, remote_edu_contents
+ )
+
+ self.notifier.on_new_event(
+ "to_device_key", stream_id, users=local_messages.keys()
+ )
+
+ for destination in remote_messages.keys():
+ # Enqueue a new federation transaction to send the new
+ # device messages to each remote destination.
+ self.federation.send_device_messages(destination)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index dc90a5dde4..8a1038c44a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -832,11 +832,13 @@ class FederationHandler(BaseHandler):
new_pdu = event
- message_handler = self.hs.get_handlers().message_handler
- destinations = yield message_handler.get_joined_hosts_for_room_from_state(
- context
+ users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+ destinations = set(
+ get_domain_from_id(user_id) for user_id in users_in_room
+ if not self.hs.is_mine_id(user_id)
)
- destinations = set(destinations)
+
destinations.discard(origin)
logger.debug(
@@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler):
new_pdu = event
- message_handler = self.hs.get_handlers().message_handler
- destinations = yield message_handler.get_joined_hosts_for_room_from_state(
- context
+ users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+ destinations = set(
+ get_domain_from_id(user_id) for user_id in users_in_room
+ if not self.hs.is_mine_id(user_id)
)
- destinations = set(destinations)
destinations.discard(origin)
logger.debug(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3577db0595..178209a209 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.metrics import measure_func
-from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -945,7 +944,12 @@ class MessageHandler(BaseHandler):
event_stream_id, max_stream_id
)
- destinations = yield self.get_joined_hosts_for_room_from_state(context)
+ users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+ destinations = [
+ get_domain_from_id(user_id) for user_id in users_in_room
+ if not self.hs.is_mine_id(user_id)
+ ]
@defer.inlineCallbacks
def _notify():
@@ -963,39 +967,3 @@ class MessageHandler(BaseHandler):
preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)
-
- def get_joined_hosts_for_room_from_state(self, context):
- state_group = context.state_group
- if not state_group:
- # If state_group is None it means it has yet to be assigned a
- # state group, i.e. we need to make sure that calls with a state_group
- # of None don't hit previous cached calls with a None state_group.
- # To do this we set the state_group to a new object as object() != object()
- state_group = object()
-
- return self._get_joined_hosts_for_room_from_state(
- state_group, context.current_state_ids
- )
-
- @cachedInlineCallbacks(num_args=1, cache_context=True)
- def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids,
- cache_context):
-
- # Don't bother getting state for people on the same HS
- current_state = yield self.store.get_events([
- e_id for key, e_id in current_state_ids.items()
- if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1])
- ])
-
- destinations = set()
- for e in current_state.itervalues():
- try:
- if e.type == EventTypes.Member:
- if e.content["membership"] == Membership.JOIN:
- destinations.add(get_domain_from_id(e.state_key))
- except SynapseError:
- logger.warn(
- "Failed to get destination from event %s", e.event_id
- )
-
- defer.returnValue(destinations)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf82a2336e..7a3c16a8aa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,11 @@ bump_active_time_counter = metrics.register_counter("bump_active_time")
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
+state_transition_counter = metrics.register_counter(
+ "state_transition", labels=["from", "to"]
+)
+
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
@@ -646,6 +651,13 @@ class PresenceHandler(object):
)
continue
+ if get_domain_from_id(user_id) != origin:
+ logger.info(
+ "Got presence update from %r with bad 'user_id': %r",
+ origin, user_id,
+ )
+ continue
+
presence_state = push.get("presence", None)
if not presence_state:
logger.info(
@@ -939,27 +951,32 @@ class PresenceHandler(object):
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
"""
+ if old_state == new_state:
+ return False
+
if old_state.status_msg != new_state.status_msg:
+ notify_reason_counter.inc("status_msg_change")
return True
- if old_state.state == PresenceState.ONLINE:
- if new_state.state != PresenceState.ONLINE:
- # Always notify for online -> anything
- return True
+ if old_state.state != new_state.state:
+ notify_reason_counter.inc("state_change")
+ state_transition_counter.inc(old_state.state, new_state.state)
+ return True
+ if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
+ notify_reason_counter.inc("current_active_change")
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
- if not (old_state.currently_active and new_state.currently_active):
+ if not new_state.currently_active:
+ notify_reason_counter.inc("last_active_change_online")
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
- return True
-
- if old_state.state != new_state.state:
+ notify_reason_counter.inc("last_active_change_not_online")
return True
return False
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index bf6b1c1535..8758af4ca1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -444,6 +444,16 @@ class RoomListHandler(BaseHandler):
self.remote_list_cache = yield deferred
@defer.inlineCallbacks
+ def get_remote_public_room_list(self, server_name):
+ res = yield self.hs.get_replication_layer().get_public_rooms(
+ [server_name]
+ )
+
+ if server_name not in res:
+ raise SynapseError(404, "Server not found")
+ defer.returnValue(res[server_name])
+
+ @defer.inlineCallbacks
def get_aggregated_public_room_list(self):
"""
Get the public room list from this server and the servers
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0b530b9034..3b687957dd 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -199,7 +199,14 @@ class TypingHandler(object):
user_id = content["user_id"]
# Check that the string is a valid user id
- UserID.from_string(user_id)
+ user = UserID.from_string(user_id)
+
+ if user.domain != origin:
+ logger.info(
+ "Got typing update from %r with bad 'user_id': %r",
+ origin, user_id,
+ )
+ return
users = yield self.state.get_current_user_in_room(room_id)
domains = set(get_domain_from_id(u) for u in users)
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 64d8eb2af1..251078ba57 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
class SlavedDeviceInboxStore(BaseSlavedStore):
@@ -24,6 +25,10 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_inbox", "stream_id",
)
+ self._device_inbox_stream_cache = StreamChangeCache(
+ "DeviceInboxStreamChangeCache",
+ self._device_inbox_id_gen.get_current_token()
+ )
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
@@ -38,5 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
stream = result.get("to_device")
if stream:
self._device_inbox_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ stream_id = row[0]
+ user_id = row[1]
+ self._device_inbox_stream_cache.entity_has_changed(
+ user_id, stream_id
+ )
return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cbebd5b2f7..15c52774a2 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_state_groups_from_groups = (
StateStore.__dict__["_get_state_groups_from_groups"]
)
+ _get_state_groups_from_groups_txn = (
+ DataStore._get_state_groups_from_groups_txn.__func__
+ )
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 0d81757010..3c933f1620 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.types import UserID, RoomID, RoomAlias
from synapse.events.utils import serialize_event
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request, parse_string
import logging
import urllib
@@ -295,15 +295,26 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
+ server = parse_string(request, "server", default=None)
+
try:
yield self.auth.get_user_by_req(request)
- except AuthError:
- # This endpoint isn't authed, but its useful to know who's hitting
- # it if they *do* supply an access token
- pass
+ except AuthError as e:
+ # We allow people to not be authed if they're just looking at our
+ # room list, but require auth when we proxy the request.
+ # In both cases we call the auth function, as that has the side
+ # effect of logging who issued this request if an access token was
+ # provided.
+ if server:
+ raise e
+ else:
+ pass
handler = self.hs.get_room_list_handler()
- data = yield handler.get_aggregated_public_room_list()
+ if server:
+ data = yield handler.get_remote_public_room_list(server)
+ else:
+ data = yield handler.get_aggregated_public_room_list()
defer.returnValue((200, data))
diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py
index f1a48acf07..fd2a3d69d4 100644
--- a/synapse/rest/client/v2_alpha/notifications.py
+++ b/synapse/rest/client/v2_alpha/notifications.py
@@ -45,11 +45,12 @@ class NotificationsServlet(RestServlet):
from_token = parse_string(request, "from", required=False)
limit = parse_integer(request, "limit", default=50)
+ only = parse_string(request, "only", required=False)
limit = min(limit, 500)
push_actions = yield self.store.get_push_actions_for_user(
- user_id, from_token, limit
+ user_id, from_token, limit, only_highlight=(only == "highlight")
)
receipts_by_room = yield self.store.get_receipts_for_user_with_orderings(
diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py
index 9c10a99acf..5975164b37 100644
--- a/synapse/rest/client/v2_alpha/sendtodevice.py
+++ b/synapse/rest/client/v2_alpha/sendtodevice.py
@@ -16,10 +16,11 @@
import logging
from twisted.internet import defer
-from synapse.http.servlet import parse_json_object_from_request
from synapse.http import servlet
+from synapse.http.servlet import parse_json_object_from_request
from synapse.rest.client.v1.transactions import HttpTransactionStore
+
from ._base import client_v2_patterns
logger = logging.getLogger(__name__)
@@ -39,10 +40,8 @@ class SendToDeviceRestServlet(servlet.RestServlet):
super(SendToDeviceRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
- self.store = hs.get_datastore()
- self.notifier = hs.get_notifier()
- self.is_mine_id = hs.is_mine_id
self.txns = HttpTransactionStore()
+ self.device_message_handler = hs.get_device_message_handler()
@defer.inlineCallbacks
def on_PUT(self, request, message_type, txn_id):
@@ -57,28 +56,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
content = parse_json_object_from_request(request)
- # TODO: Prod the notifier to wake up sync streams.
- # TODO: Implement replication for the messages.
- # TODO: Send the messages to remote servers if needed.
-
- local_messages = {}
- for user_id, by_device in content["messages"].items():
- if self.is_mine_id(user_id):
- messages_by_device = {
- device_id: {
- "content": message_content,
- "type": message_type,
- "sender": requester.user.to_string(),
- }
- for device_id, message_content in by_device.items()
- }
- if messages_by_device:
- local_messages[user_id] = messages_by_device
-
- stream_id = yield self.store.add_messages_to_device_inbox(local_messages)
-
- self.notifier.on_new_event(
- "to_device_key", stream_id, users=local_messages.keys()
+ sender_user_id = requester.user.to_string()
+
+ yield self.device_message_handler.send_device_message(
+ sender_user_id, message_type, content["messages"]
)
response = (200, {})
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index 9f0625a822..a45ee9483e 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -45,7 +45,14 @@ class DownloadResource(Resource):
@request_handler()
@defer.inlineCallbacks
def _async_render_GET(self, request):
- request.setHeader("Content-Security-Policy", "sandbox")
+ request.setHeader(
+ "Content-Security-Policy",
+ "default-src 'none';"
+ " script-src 'none';"
+ " plugin-types application/pdf;"
+ " style-src 'unsafe-inline';"
+ " object-src 'self';"
+ )
server_name, media_id, name = parse_media_id(request)
if server_name == self.server_name:
yield self._respond_local_file(request, media_id, name)
diff --git a/synapse/server.py b/synapse/server.py
index af3246504b..f516f08167 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -35,6 +35,7 @@ from synapse.federation import initialize_http_replication
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler
+from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
@@ -100,6 +101,7 @@ class HomeServer(object):
'application_service_api',
'application_service_scheduler',
'application_service_handler',
+ 'device_message_handler',
'notifier',
'distributor',
'client_resource',
@@ -205,6 +207,9 @@ class HomeServer(object):
def build_device_handler(self):
return DeviceHandler(self)
+ def build_device_message_handler(self):
+ return DeviceMessageHandler(self)
+
def build_e2e_keys_handler(self):
return E2eKeysHandler(self)
diff --git a/synapse/state.py b/synapse/state.py
index cd792afed1..4520fa0415 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -55,12 +55,15 @@ def _gen_state_id():
class _StateCacheEntry(object):
- __slots__ = ["state", "state_group", "state_id"]
+ __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
- def __init__(self, state, state_group):
+ def __init__(self, state, state_group, prev_group=None, delta_ids=None):
self.state = state
self.state_group = state_group
+ self.prev_group = prev_group
+ self.delta_ids = delta_ids
+
# The `state_id` is a unique ID we generate that can be used as ID for
# this collection of state. Usually this would be the same as the
# state group, but on worker instances we can't generate a new state
@@ -245,11 +248,20 @@ class StateHandler(object):
if key in context.prev_state_ids:
replaces = context.prev_state_ids[key]
event.unsigned["replaces_state"] = replaces
+
context.current_state_ids = dict(context.prev_state_ids)
context.current_state_ids[key] = event.event_id
+
+ context.prev_group = entry.prev_group
+ context.delta_ids = entry.delta_ids
+ if context.delta_ids is not None:
+ context.delta_ids[key] = event.event_id
else:
context.current_state_ids = context.prev_state_ids
+ context.prev_group = entry.prev_group
+ context.delta_ids = entry.delta_ids
+
context.prev_state_events = []
defer.returnValue(context)
@@ -283,6 +295,8 @@ class StateHandler(object):
defer.returnValue(_StateCacheEntry(
state=state_list,
state_group=name,
+ prev_group=name,
+ delta_ids={},
))
with (yield self.resolve_linearizer.queue(group_names)):
@@ -340,9 +354,24 @@ class StateHandler(object):
if hasattr(self.store, "get_next_state_group"):
state_group = self.store.get_next_state_group()
+ prev_group = None
+ delta_ids = None
+ for old_group, old_ids in state_groups_ids.items():
+ if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
+ n_delta_ids = {
+ k: v
+ for k, v in new_state.items()
+ if old_ids.get(k) != v
+ }
+ if not delta_ids or len(n_delta_ids) < len(delta_ids):
+ prev_group = old_group
+ delta_ids = n_delta_ids
+
cache = _StateCacheEntry(
state=new_state,
state_group=state_group,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
)
if self._state_cache is not None:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6c32773f25..828e5ca60b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
+ max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+ device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
+ db_conn, "device_inbox",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=max_device_inbox_id
+ )
+ self._device_inbox_stream_cache = StreamChangeCache(
+ "DeviceInboxStreamChangeCache", min_device_inbox_id,
+ prefilled_cache=device_inbox_prefill,
+ )
+ # The federation outbox and the local device inbox uses the same
+ # stream_id generator.
+ device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
+ db_conn, "device_federation_outbox",
+ entity_column="destination",
+ stream_column="stream_id",
+ max_value=max_device_inbox_id,
+ )
+ self._device_federation_outbox_stream_cache = StreamChangeCache(
+ "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id,
+ prefilled_cache=device_outbox_prefill,
+ )
+
cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 30d0e4c5dc..003f5ba203 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -133,10 +133,12 @@ class BackgroundUpdateStore(SQLBaseStore):
updates = yield self._simple_select_list(
"background_updates",
keyvalues=None,
- retcols=("update_name",),
+ retcols=("update_name", "depends_on"),
)
+ in_flight = set(update["update_name"] for update in updates)
for update in updates:
- self._background_update_queue.append(update['update_name'])
+ if update["depends_on"] not in in_flight:
+ self._background_update_queue.append(update['update_name'])
if not self._background_update_queue:
# no work left to do
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 68116b0394..658fbef27b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -27,63 +27,157 @@ logger = logging.getLogger(__name__)
class DeviceInboxStore(SQLBaseStore):
@defer.inlineCallbacks
- def add_messages_to_device_inbox(self, messages_by_user_then_device):
- """
+ def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
+ remote_messages_by_destination):
+ """Used to send messages from this server.
+
Args:
- messages_by_user_and_device(dict):
+ sender_user_id(str): The ID of the user sending these messages.
+ local_messages_by_user_and_device(dict):
Dictionary of user_id to device_id to message.
+ remote_messages_by_destination(dict):
+ Dictionary of destination server_name to the EDU JSON to send.
Returns:
A deferred stream_id that resolves when the messages have been
inserted.
"""
- def select_devices_txn(txn, user_id, devices):
- if not devices:
- return []
- sql = (
- "SELECT user_id, device_id FROM devices"
- " WHERE user_id = ? AND device_id IN ("
- + ",".join("?" * len(devices))
- + ")"
+ def add_messages_txn(txn, now_ms, stream_id):
+ # Add the local messages directly to the local inbox.
+ self._add_messages_to_local_device_inbox_txn(
+ txn, stream_id, local_messages_by_user_then_device
)
- # TODO: Maybe this needs to be done in batches if there are
- # too many local devices for a given user.
- args = [user_id] + devices
- txn.execute(sql, args)
- return [tuple(row) for row in txn.fetchall()]
-
- def add_messages_to_device_inbox_txn(txn, stream_id):
- local_users_and_devices = set()
- for user_id, messages_by_device in messages_by_user_then_device.items():
- local_users_and_devices.update(
- select_devices_txn(txn, user_id, messages_by_device.keys())
- )
+ # Add the remote messages to the federation outbox.
+ # We'll send them to a remote server when we next send a
+ # federation transaction to that destination.
sql = (
- "INSERT INTO device_inbox"
- " (user_id, device_id, stream_id, message_json)"
+ "INSERT INTO device_federation_outbox"
+ " (destination, stream_id, queued_ts, messages_json)"
" VALUES (?,?,?,?)"
)
rows = []
- for user_id, messages_by_device in messages_by_user_then_device.items():
- for device_id, message in messages_by_device.items():
- message_json = ujson.dumps(message)
- # Only insert into the local inbox if the device exists on
- # this server
- if (user_id, device_id) in local_users_and_devices:
- rows.append((user_id, device_id, stream_id, message_json))
-
+ for destination, edu in remote_messages_by_destination.items():
+ edu_json = ujson.dumps(edu)
+ rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
with self._device_inbox_id_gen.get_next() as stream_id:
+ now_ms = self.clock.time_msec()
yield self.runInteraction(
"add_messages_to_device_inbox",
- add_messages_to_device_inbox_txn,
- stream_id
+ add_messages_txn,
+ now_ms,
+ stream_id,
)
+ for user_id in local_messages_by_user_then_device.keys():
+ self._device_inbox_stream_cache.entity_has_changed(
+ user_id, stream_id
+ )
+ for destination in remote_messages_by_destination.keys():
+ self._device_federation_outbox_stream_cache.entity_has_changed(
+ destination, stream_id
+ )
defer.returnValue(self._device_inbox_id_gen.get_current_token())
+ @defer.inlineCallbacks
+ def add_messages_from_remote_to_device_inbox(
+ self, origin, message_id, local_messages_by_user_then_device
+ ):
+ def add_messages_txn(txn, now_ms, stream_id):
+ # Check if we've already inserted a matching message_id for that
+ # origin. This can happen if the origin doesn't receive our
+ # acknowledgement from the first time we received the message.
+ already_inserted = self._simple_select_one_txn(
+ txn, table="device_federation_inbox",
+ keyvalues={"origin": origin, "message_id": message_id},
+ retcols=("message_id",),
+ allow_none=True,
+ )
+ if already_inserted is not None:
+ return
+
+ # Add an entry for this message_id so that we know we've processed
+ # it.
+ self._simple_insert_txn(
+ txn, table="device_federation_inbox",
+ values={
+ "origin": origin,
+ "message_id": message_id,
+ "received_ts": now_ms,
+ },
+ )
+
+ # Add the messages to the approriate local device inboxes so that
+ # they'll be sent to the devices when they next sync.
+ self._add_messages_to_local_device_inbox_txn(
+ txn, stream_id, local_messages_by_user_then_device
+ )
+
+ with self._device_inbox_id_gen.get_next() as stream_id:
+ now_ms = self.clock.time_msec()
+ yield self.runInteraction(
+ "add_messages_from_remote_to_device_inbox",
+ add_messages_txn,
+ now_ms,
+ stream_id,
+ )
+ for user_id in local_messages_by_user_then_device.keys():
+ self._device_inbox_stream_cache.entity_has_changed(
+ user_id, stream_id
+ )
+
+ def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
+ messages_by_user_then_device):
+ local_by_user_then_device = {}
+ for user_id, messages_by_device in messages_by_user_then_device.items():
+ messages_json_for_user = {}
+ devices = messages_by_device.keys()
+ if len(devices) == 1 and devices[0] == "*":
+ # Handle wildcard device_ids.
+ sql = (
+ "SELECT device_id FROM devices"
+ " WHERE user_id = ?"
+ )
+ txn.execute(sql, (user_id,))
+ message_json = ujson.dumps(messages_by_device["*"])
+ for row in txn.fetchall():
+ # Add the message for all devices for this user on this
+ # server.
+ device = row[0]
+ messages_json_for_user[device] = message_json
+ else:
+ sql = (
+ "SELECT device_id FROM devices"
+ " WHERE user_id = ? AND device_id IN ("
+ + ",".join("?" * len(devices))
+ + ")"
+ )
+ # TODO: Maybe this needs to be done in batches if there are
+ # too many local devices for a given user.
+ txn.execute(sql, [user_id] + devices)
+ for row in txn.fetchall():
+ # Only insert into the local inbox if the device exists on
+ # this server
+ device = row[0]
+ message_json = ujson.dumps(messages_by_device[device])
+ messages_json_for_user[device] = message_json
+
+ local_by_user_then_device[user_id] = messages_json_for_user
+
+ sql = (
+ "INSERT INTO device_inbox"
+ " (user_id, device_id, stream_id, message_json)"
+ " VALUES (?,?,?,?)"
+ )
+ rows = []
+ for user_id, messages_by_device in local_by_user_then_device.items():
+ for device_id, message_json in messages_by_device.items():
+ rows.append((user_id, device_id, stream_id, message_json))
+
+ txn.executemany(sql, rows)
+
def get_new_messages_for_device(
self, user_id, device_id, last_stream_id, current_stream_id, limit=100
):
@@ -97,6 +191,12 @@ class DeviceInboxStore(SQLBaseStore):
Deferred ([dict], int): List of messages for the device and where
in the stream the messages got to.
"""
+ has_changed = self._device_inbox_stream_cache.has_entity_changed(
+ user_id, last_stream_id
+ )
+ if not has_changed:
+ return defer.succeed(([], current_stream_id))
+
def get_new_messages_for_device_txn(txn):
sql = (
"SELECT stream_id, message_json FROM device_inbox"
@@ -182,3 +282,71 @@ class DeviceInboxStore(SQLBaseStore):
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()
+
+ def get_new_device_msgs_for_remote(
+ self, destination, last_stream_id, current_stream_id, limit=100
+ ):
+ """
+ Args:
+ destination(str): The name of the remote server.
+ last_stream_id(int): The last position of the device message stream
+ that the server sent up to.
+ current_stream_id(int): The current position of the device
+ message stream.
+ Returns:
+ Deferred ([dict], int): List of messages for the device and where
+ in the stream the messages got to.
+ """
+
+ has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
+ destination, last_stream_id
+ )
+ if not has_changed:
+ return defer.succeed(([], current_stream_id))
+
+ def get_new_messages_for_remote_destination_txn(txn):
+ sql = (
+ "SELECT stream_id, messages_json FROM device_federation_outbox"
+ " WHERE destination = ?"
+ " AND ? < stream_id AND stream_id <= ?"
+ " ORDER BY stream_id ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (
+ destination, last_stream_id, current_stream_id, limit
+ ))
+ messages = []
+ for row in txn.fetchall():
+ stream_pos = row[0]
+ messages.append(ujson.loads(row[1]))
+ if len(messages) < limit:
+ stream_pos = current_stream_id
+ return (messages, stream_pos)
+
+ return self.runInteraction(
+ "get_new_device_msgs_for_remote",
+ get_new_messages_for_remote_destination_txn,
+ )
+
+ def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
+ """Used to delete messages when the remote destination acknowledges
+ their receipt.
+
+ Args:
+ destination(str): The destination server_name
+ up_to_stream_id(int): Where to delete messages up to.
+ Returns:
+ A deferred that resolves when the messages have been deleted.
+ """
+ def delete_messages_for_remote_destination_txn(txn):
+ sql = (
+ "DELETE FROM device_federation_outbox"
+ " WHERE destination = ?"
+ " AND stream_id <= ?"
+ )
+ txn.execute(sql, (destination, up_to_stream_id))
+
+ return self.runInteraction(
+ "delete_device_msgs_for_remote",
+ delete_messages_for_remote_destination_txn
+ )
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index afd6530cab..17920d4480 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore):
or_ignore=ignore_if_known,
)
except Exception as e:
- logger.error("store_device with device_id=%s failed: %s",
- device_id, e)
+ logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
+ " display_name=%s(%r) failed: %s",
+ type(device_id).__name__, device_id,
+ type(user_id).__name__, user_id,
+ type(initial_device_display_name).__name__,
+ initial_device_display_name, e)
raise StoreError(500, "Problem storing device.")
def get_device(self, user_id, device_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index eb15fb751b..10e9305f7b 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -338,14 +338,21 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue(notifs[:limit])
@defer.inlineCallbacks
- def get_push_actions_for_user(self, user_id, before=None, limit=50):
+ def get_push_actions_for_user(self, user_id, before=None, limit=50,
+ only_highlight=False):
def f(txn):
before_clause = ""
if before:
- before_clause = "AND stream_ordering < ?"
+ before_clause = "AND epa.stream_ordering < ?"
args = [user_id, before, limit]
else:
args = [user_id, limit]
+
+ if only_highlight:
+ if len(before_clause) > 0:
+ before_clause += " "
+ before_clause += "AND epa.highlight = 1"
+
sql = (
"SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering,"
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1a7d4c5199..ed182c8d11 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore):
# insert into the state_group, state_groups_state and
# event_to_state_groups tables.
- self._store_mult_state_groups_txn(txn, ((event, context),))
+ try:
+ self._store_mult_state_groups_txn(txn, ((event, context),))
+ except Exception:
+ logger.exception("")
+ raise
metadata_json = encode_json(
event.internal_metadata.get_dict()
@@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore):
)
event_rows = txn.fetchall()
+ for event_id, state_key in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
@@ -1582,7 +1589,66 @@ class EventsStore(SQLBaseStore):
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering)
)
+
state_rows = txn.fetchall()
+ state_groups_to_delete = [sg for sg, in state_rows]
+
+ # Now we get all the state groups that rely on these state groups
+ new_state_edges = []
+ chunks = [
+ state_groups_to_delete[i:i + 100]
+ for i in xrange(0, len(state_groups_to_delete), 100)
+ ]
+ for chunk in chunks:
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=chunk,
+ retcols=["state_group"],
+ keyvalues={},
+ )
+ new_state_edges.extend(row["state_group"] for row in rows)
+
+ # Now we turn the state groups that reference to-be-deleted state groups
+ # to non delta versions.
+ for new_state_edge in new_state_edges:
+ curr_state = self._get_state_groups_from_groups_txn(
+ txn, [new_state_edge], types=None
+ )
+ curr_state = curr_state[new_state_edge]
+
+ self._simple_delete_txn(
+ txn,
+ table="state_groups_state",
+ keyvalues={
+ "state_group": new_state_edge,
+ }
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={
+ "state_group": new_state_edge,
+ }
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": new_state_edge,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in curr_state.items()
+ ],
+ )
+
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b94ce7bea1..7efbe51cda 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 34
+SCHEMA_VERSION = 35
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
module = imp.load_source(
module_name, absolute_path, python_file
)
- logger.debug("Running script %s", relative_path)
+ logger.info("Running script %s", relative_path)
module.run_create(cur, database_engine)
if not is_empty:
module.run_upgrade(cur, database_engine, config=config)
@@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
pass
elif ext == ".sql":
# A plain old .sql file, just read and execute it
- logger.debug("Applying schema %s", relative_path)
+ logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
else:
# Not a valid delta file.
diff --git a/synapse/storage/schema/delta/34/device_outbox.sql b/synapse/storage/schema/delta/34/device_outbox.sql
new file mode 100644
index 0000000000..e87066d9a1
--- /dev/null
+++ b/synapse/storage/schema/delta/34/device_outbox.sql
@@ -0,0 +1,36 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE device_federation_outbox (
+ destination TEXT NOT NULL,
+ stream_id BIGINT NOT NULL,
+ queued_ts BIGINT NOT NULL,
+ messages_json TEXT NOT NULL
+);
+
+
+CREATE INDEX device_federation_outbox_destination_id
+ ON device_federation_outbox(destination, stream_id);
+
+
+CREATE TABLE device_federation_inbox (
+ origin TEXT NOT NULL,
+ message_id TEXT NOT NULL,
+ received_ts BIGINT NOT NULL
+);
+
+
+CREATE INDEX device_federation_inbox_sender_id
+ ON device_federation_inbox(origin, message_id);
diff --git a/synapse/storage/schema/delta/35/add_state_index.sql b/synapse/storage/schema/delta/35/add_state_index.sql
new file mode 100644
index 0000000000..0fce26345b
--- /dev/null
+++ b/synapse/storage/schema/delta/35/add_state_index.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+ALTER TABLE background_updates ADD COLUMN depends_on TEXT;
+
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES ('state_group_state_type_index', '{}', 'state_group_state_deduplication');
diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql
new file mode 100644
index 0000000000..0f1fa68a89
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE state_group_edges(
+ state_group BIGINT NOT NULL,
+ prev_state_group BIGINT NOT NULL
+);
+
+CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
+CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group);
diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql
new file mode 100644
index 0000000000..97e5067ef4
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state_dedupe.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('state_group_state_deduplication', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ec551b0b4f..0cff0a0cda 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches import intern_string
+from synapse.storage.engines import PostgresEngine
from twisted.internet import defer
@@ -24,6 +25,9 @@ import logging
logger = logging.getLogger(__name__)
+MAX_STATE_DELTA_HOPS = 100
+
+
class StateStore(SQLBaseStore):
""" Keeps track of the state at a given event.
@@ -43,6 +47,20 @@ class StateStore(SQLBaseStore):
* `state_groups_state`: Maps state group to state events.
"""
+ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
+ STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
+
+ def __init__(self, hs):
+ super(StateStore, self).__init__(hs)
+ self.register_background_update_handler(
+ self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
+ self._background_deduplicate_state,
+ )
+ self.register_background_update_handler(
+ self.STATE_GROUP_INDEX_UPDATE_NAME,
+ self._background_index_state,
+ )
+
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
if not event_ids:
@@ -103,11 +121,8 @@ class StateStore(SQLBaseStore):
state_groups[event.event_id] = context.state_group
if self._have_persisted_state_group_txn(txn, context.state_group):
- logger.info("Already persisted state_group: %r", context.state_group)
continue
- state_event_ids = dict(context.current_state_ids)
-
self._simple_insert_txn(
txn,
table="state_groups",
@@ -118,20 +133,51 @@ class StateStore(SQLBaseStore):
},
)
- self._simple_insert_many_txn(
- txn,
- table="state_groups_state",
- values=[
- {
+ # We persist as a delta if we can, while also ensuring the chain
+ # of deltas isn't tooo long, as otherwise read performance degrades.
+ if context.prev_group:
+ potential_hops = self._count_state_group_hops_txn(
+ txn, context.prev_group
+ )
+ if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
+ self._simple_insert_txn(
+ txn,
+ table="state_group_edges",
+ values={
"state_group": context.state_group,
- "room_id": event.room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": state_id,
- }
- for key, state_id in state_event_ids.items()
- ],
- )
+ "prev_state_group": context.prev_group,
+ },
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": context.state_group,
+ "room_id": event.room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in context.delta_ids.items()
+ ],
+ )
+ else:
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": context.state_group,
+ "room_id": event.room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in context.current_state_ids.items()
+ ],
+ )
self._simple_insert_many_txn(
txn,
@@ -145,6 +191,47 @@ class StateStore(SQLBaseStore):
],
)
+ def _count_state_group_hops_txn(self, txn, state_group):
+ """Given a state group, count how many hops there are in the tree.
+
+ This is used to ensure the delta chains don't get too long.
+ """
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = ("""
+ WITH RECURSIVE state(state_group) AS (
+ VALUES(?::bigint)
+ UNION ALL
+ SELECT prev_state_group FROM state_group_edges e, state s
+ WHERE s.state_group = e.state_group
+ )
+ SELECT count(*) FROM state;
+ """)
+
+ txn.execute(sql, (state_group,))
+ row = txn.fetchone()
+ if row and row[0]:
+ return row[0]
+ else:
+ return 0
+ else:
+ # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+ # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+ next_group = state_group
+ count = 0
+
+ while next_group:
+ next_group = self._simple_select_one_onecol_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={"state_group": next_group},
+ retcol="prev_state_group",
+ allow_none=True,
+ )
+ if next_group:
+ count += 1
+
+ return count
+
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
if event_type and state_key is not None:
@@ -206,48 +293,108 @@ class StateStore(SQLBaseStore):
def _get_state_groups_from_groups(self, groups, types):
"""Returns dictionary state_group -> (dict of (type, state_key) -> event id)
"""
- def f(txn, groups):
- if types is not None:
- where_clause = "AND (%s)" % (
- " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
- )
- else:
- where_clause = ""
-
- sql = (
- "SELECT state_group, event_id, type, state_key"
- " FROM state_groups_state WHERE"
- " state_group IN (%s) %s" % (
- ",".join("?" for _ in groups),
- where_clause,
- )
- )
-
- args = list(groups)
- if types is not None:
- args.extend([i for typ in types for i in typ])
-
- txn.execute(sql, args)
- rows = self.cursor_to_dict(txn)
-
- results = {group: {} for group in groups}
- for row in rows:
- key = (row["type"], row["state_key"])
- results[row["state_group"]][key] = row["event_id"]
- return results
-
results = {}
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
for chunk in chunks:
res = yield self.runInteraction(
"_get_state_groups_from_groups",
- f, chunk
+ self._get_state_groups_from_groups_txn, chunk, types,
)
results.update(res)
defer.returnValue(results)
+ def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+ if types is not None:
+ where_clause = "AND (%s)" % (
+ " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
+ )
+ else:
+ where_clause = ""
+
+ results = {group: {} for group in groups}
+ if isinstance(self.database_engine, PostgresEngine):
+ # Temporarily disable sequential scans in this transaction. This is
+ # a temporary hack until we can add the right indices in
+ txn.execute("SET LOCAL enable_seqscan=off")
+
+ # The below query walks the state_group tree so that the "state"
+ # table includes all state_groups in the tree. It then joins
+ # against `state_groups_state` to fetch the latest state.
+ # It assumes that previous state groups are always numerically
+ # lesser.
+ # The PARTITION is used to get the event_id in the greatest state
+ # group for the given type, state_key.
+ # This may return multiple rows per (type, state_key), but last_value
+ # should be the same.
+ sql = ("""
+ WITH RECURSIVE state(state_group) AS (
+ VALUES(?::bigint)
+ UNION ALL
+ SELECT prev_state_group FROM state_group_edges e, state s
+ WHERE s.state_group = e.state_group
+ )
+ SELECT type, state_key, last_value(event_id) OVER (
+ PARTITION BY type, state_key ORDER BY state_group ASC
+ ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ ) AS event_id FROM state_groups_state
+ WHERE state_group IN (
+ SELECT state_group FROM state
+ )
+ %s;
+ """) % (where_clause,)
+
+ for group in groups:
+ args = [group]
+ if types is not None:
+ args.extend([i for typ in types for i in typ])
+
+ txn.execute(sql, args)
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ key = (row["type"], row["state_key"])
+ results[group][key] = row["event_id"]
+ else:
+ # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+ # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+ for group in groups:
+ group_tree = [group]
+ next_group = group
+
+ while next_group:
+ next_group = self._simple_select_one_onecol_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={"state_group": next_group},
+ retcol="prev_state_group",
+ allow_none=True,
+ )
+ if next_group:
+ group_tree.append(next_group)
+
+ sql = ("""
+ SELECT type, state_key, event_id FROM state_groups_state
+ INNER JOIN (
+ SELECT type, state_key, max(state_group) as state_group
+ FROM state_groups_state
+ WHERE state_group IN (%s) %s
+ GROUP BY type, state_key
+ ) USING (type, state_key, state_group);
+ """) % (",".join("?" for _ in group_tree), where_clause,)
+
+ args = list(group_tree)
+ if types is not None:
+ args.extend([i for typ in types for i in typ])
+
+ txn.execute(sql, args)
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ key = (row["type"], row["state_key"])
+ results[group][key] = row["event_id"]
+
+ return results
+
@defer.inlineCallbacks
def get_state_for_events(self, event_ids, types):
"""Given a list of event_ids and type tuples, return a list of state
@@ -504,32 +651,178 @@ class StateStore(SQLBaseStore):
defer.returnValue(results)
- def get_all_new_state_groups(self, last_id, current_id, limit):
- def get_all_new_state_groups_txn(txn):
- sql = (
- "SELECT id, room_id, event_id FROM state_groups"
- " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+ def get_next_state_group(self):
+ return self._state_groups_id_gen.get_next()
+
+ @defer.inlineCallbacks
+ def _background_deduplicate_state(self, progress, batch_size):
+ """This background update will slowly deduplicate state by reencoding
+ them as deltas.
+ """
+ last_state_group = progress.get("last_state_group", 0)
+ rows_inserted = progress.get("rows_inserted", 0)
+ max_group = progress.get("max_group", None)
+
+ BATCH_SIZE_SCALE_FACTOR = 100
+
+ batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR))
+
+ if max_group is None:
+ rows = yield self._execute(
+ "_background_deduplicate_state", None,
+ "SELECT coalesce(max(id), 0) FROM state_groups",
)
- txn.execute(sql, (last_id, current_id, limit))
- groups = txn.fetchall()
+ max_group = rows[0][0]
+
+ def reindex_txn(txn):
+ new_last_state_group = last_state_group
+ for count in xrange(batch_size):
+ txn.execute(
+ "SELECT id, room_id FROM state_groups"
+ " WHERE ? < id AND id <= ?"
+ " ORDER BY id ASC"
+ " LIMIT 1",
+ (new_last_state_group, max_group,)
+ )
+ row = txn.fetchone()
+ if row:
+ state_group, room_id = row
- if not groups:
- return ([], [])
+ if not row or not state_group:
+ return True, count
- lower_bound = groups[0][0]
- upper_bound = groups[-1][0]
- sql = (
- "SELECT state_group, type, state_key, event_id"
- " FROM state_groups_state"
- " WHERE ? <= state_group AND state_group <= ?"
+ txn.execute(
+ "SELECT state_group FROM state_group_edges"
+ " WHERE state_group = ?",
+ (state_group,)
+ )
+
+ # If we reach a point where we've already started inserting
+ # edges we should stop.
+ if txn.fetchall():
+ return True, count
+
+ txn.execute(
+ "SELECT coalesce(max(id), 0) FROM state_groups"
+ " WHERE id < ? AND room_id = ?",
+ (state_group, room_id,)
+ )
+ prev_group, = txn.fetchone()
+ new_last_state_group = state_group
+
+ if prev_group:
+ potential_hops = self._count_state_group_hops_txn(
+ txn, prev_group
+ )
+ if potential_hops >= MAX_STATE_DELTA_HOPS:
+ # We want to ensure chains are at most this long,#
+ # otherwise read performance degrades.
+ continue
+
+ prev_state = self._get_state_groups_from_groups_txn(
+ txn, [prev_group], types=None
+ )
+ prev_state = prev_state[prev_group]
+
+ curr_state = self._get_state_groups_from_groups_txn(
+ txn, [state_group], types=None
+ )
+ curr_state = curr_state[state_group]
+
+ if not set(prev_state.keys()) - set(curr_state.keys()):
+ # We can only do a delta if the current has a strict super set
+ # of keys
+
+ delta_state = {
+ key: value for key, value in curr_state.items()
+ if prev_state.get(key, None) != value
+ }
+
+ self._simple_delete_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={
+ "state_group": state_group,
+ }
+ )
+
+ self._simple_insert_txn(
+ txn,
+ table="state_group_edges",
+ values={
+ "state_group": state_group,
+ "prev_state_group": prev_group,
+ }
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="state_groups_state",
+ keyvalues={
+ "state_group": state_group,
+ }
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": state_group,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in delta_state.items()
+ ],
+ )
+
+ progress = {
+ "last_state_group": state_group,
+ "rows_inserted": rows_inserted + batch_size,
+ "max_group": max_group,
+ }
+
+ self._background_update_progress_txn(
+ txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress
)
- txn.execute(sql, (lower_bound, upper_bound))
- state_group_state = txn.fetchall()
- return (groups, state_group_state)
- return self.runInteraction(
- "get_all_new_state_groups", get_all_new_state_groups_txn
+ return False, batch_size
+
+ finished, result = yield self.runInteraction(
+ self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn
)
- def get_next_state_group(self):
- return self._state_groups_id_gen.get_next()
+ if finished:
+ yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME)
+
+ defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR)
+
+ @defer.inlineCallbacks
+ def _background_index_state(self, progress, batch_size):
+ def reindex_txn(txn):
+ if isinstance(self.database_engine, PostgresEngine):
+ txn.execute(
+ "CREATE INDEX state_groups_state_type_idx"
+ " ON state_groups_state(state_group, type, state_key)"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS state_groups_state_id"
+ )
+ else:
+ txn.execute(
+ "CREATE INDEX state_groups_state_type_idx"
+ " ON state_groups_state(state_group, type, state_key)"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS state_groups_state_id"
+ )
+
+ yield self.runInteraction(
+ self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn
+ )
+
+ yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
+
+ defer.returnValue(1)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index b2957eef9f..ea1f0f7c33 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -121,6 +121,14 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.auth.check_joined_room = check_joined_room
+ self.datastore.get_to_device_stream_token = lambda: 0
+ self.datastore.get_new_device_msgs_for_remote = (
+ lambda *args, **kargs: ([], 0)
+ )
+ self.datastore.delete_device_msgs_for_remote = (
+ lambda *args, **kargs: None
+ )
+
# Some local users to test with
self.u_apple = UserID.from_string("@apple:test")
self.u_banana = UserID.from_string("@banana:test")
|