summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/account_data.py50
-rw-r--r--synapse/storage/event_push_actions.py49
-rw-r--r--synapse/storage/events.py162
-rw-r--r--synapse/storage/pusher.py27
-rw-r--r--synapse/storage/receipts.py6
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/roommember.py7
-rw-r--r--synapse/storage/schema/delta/32/events.sql16
-rw-r--r--synapse/storage/schema/delta/32/pusher_throttle.sql23
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
10 files changed, 349 insertions, 32 deletions
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
 import ujson as json
 import logging
 
@@ -24,6 +26,7 @@ logger = logging.getLogger(__name__)
 
 class AccountDataStore(SQLBaseStore):
 
+    @cached()
     def get_account_data_for_user(self, user_id):
         """Get all the client account_data for a user.
 
@@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore):
             "get_account_data_for_user", get_account_data_for_user_txn
         )
 
+    @cachedInlineCallbacks(num_args=2)
+    def get_global_account_data_by_type_for_user(self, data_type, user_id):
+        """
+        Returns:
+            Deferred: A dict
+        """
+        result = yield self._simple_select_one_onecol(
+            table="account_data",
+            keyvalues={
+                "user_id": user_id,
+                "account_data_type": data_type,
+            },
+            retcol="content",
+            desc="get_global_account_data_by_type_for_user",
+            allow_none=True,
+        )
+
+        if result:
+            defer.returnValue(json.loads(result))
+        else:
+            defer.returnValue(None)
+
+    @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
+                num_args=2, list_name="user_ids", inlineCallbacks=True)
+    def get_global_account_data_by_type_for_users(self, data_type, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table="account_data",
+            column="user_id",
+            iterable=user_ids,
+            keyvalues={
+                "account_data_type": data_type,
+            },
+            retcols=("user_id", "content",),
+            desc="get_global_account_data_by_type_for_users",
+        )
+
+        defer.returnValue({
+            row["user_id"]: json.loads(row["content"]) if row["content"] else None
+            for row in rows
+        })
+
     def get_account_data_for_room(self, user_id, room_id):
         """Get all the client account_data for a user for a room.
 
@@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
@@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
+            txn.call_after(
+                self.get_global_account_data_by_type_for_user.invalidate,
+                (account_data_type, user_id,)
+            )
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 312c0071f1..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore):
                                                   max_stream_ordering=None):
         def get_after_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
-                "FROM event_push_actions AS ep, ("
-                "   SELECT room_id, user_id,"
-                "       max(topological_ordering) as topological_ordering,"
-                "       max(stream_ordering) as stream_ordering"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+                "e.received_ts "
+                "FROM ("
+                "   SELECT room_id, user_id, "
+                "       max(topological_ordering) as topological_ordering, "
+                "       max(stream_ordering) as stream_ordering "
                 "       FROM events"
                 "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
                 "   GROUP BY room_id, user_id"
-                ") AS rl "
-                "WHERE"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
                 "       ep.topological_ordering > rl.topological_ordering"
@@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore):
 
         def get_no_receipt(txn):
             sql = (
-                "SELECT ep.event_id, ep.stream_ordering, ep.actions "
-                "FROM event_push_actions AS ep "
-                "WHERE ep.room_id not in ("
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.room_id not in ("
                 "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
-                "   WHERE receipt_type = 'm.read' AND user_id = ? "
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
                 "   GROUP BY room_id"
                 ") AND ep.user_id = ? AND ep.stream_ordering > ?"
             )
@@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue([
             {
                 "event_id": row[0],
-                "stream_ordering": row[1],
-                "actions": json.loads(row[2]),
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+                "received_ts": row[4],
             } for row in after_read_receipt + no_read_receipt
         ])
 
     @defer.inlineCallbacks
+    def get_time_of_last_push_action_before(self, stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.stream_ordering > ?"
+                " ORDER BY ep.stream_ordering ASC"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (stream_ordering,))
+            return txn.fetchone()
+        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+        defer.returnValue(result[0] if result else None)
+
+    @defer.inlineCallbacks
     def get_latest_push_action_stream_ordering(self):
         def f(txn):
             txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..4655669ba0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
+from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from collections import namedtuple
+from collections import deque, namedtuple
+
 
 import logging
 import math
@@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+class _EventPeristenceQueue(object):
+    """Queues up events so that they can be persisted in bulk with only one
+    concurrent transaction per room.
+    """
+
+    _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+        "events_and_contexts", "current_state", "backfilled", "deferred",
+    ))
+
+    def __init__(self):
+        self._event_persist_queues = {}
+        self._currently_persisting_rooms = set()
+
+    def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+        """Add events to the queue, with the given persist_event options.
+        """
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+        if queue:
+            end_item = queue[-1]
+            if end_item.current_state or current_state:
+                # We perist events with current_state set to True one at a time
+                pass
+            if end_item.backfilled == backfilled:
+                end_item.events_and_contexts.extend(events_and_contexts)
+                return end_item.deferred.observe()
+
+        deferred = ObservableDeferred(defer.Deferred())
+
+        queue.append(self._EventPersistQueueItem(
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+            current_state=current_state,
+            deferred=deferred,
+        ))
+
+        return deferred.observe()
+
+    def handle_queue(self, room_id, per_item_callback):
+        """Attempts to handle the queue for a room if not already being handled.
+
+        The given callback will be invoked with for each item in the queue,1
+        of type _EventPersistQueueItem. The per_item_callback will continuously
+        be called with new items, unless the queue becomnes empty. The return
+        value of the function will be given to the deferreds waiting on the item,
+        exceptions will be passed to the deferres as well.
+
+        This function should therefore be called whenever anything is added
+        to the queue.
+
+        If another callback is currently handling the queue then it will not be
+        invoked.
+        """
+
+        if room_id in self._currently_persisting_rooms:
+            return
+
+        self._currently_persisting_rooms.add(room_id)
+
+        @defer.inlineCallbacks
+        def handle_queue_loop():
+            try:
+                queue = self._get_drainining_queue(room_id)
+                for item in queue:
+                    try:
+                        ret = yield per_item_callback(item)
+                        item.deferred.callback(ret)
+                    except Exception as e:
+                        item.deferred.errback(e)
+            finally:
+                queue = self._event_persist_queues.pop(room_id, None)
+                if queue:
+                    self._event_persist_queues[room_id] = queue
+                self._currently_persisting_rooms.discard(room_id)
+
+        preserve_fn(handle_queue_loop)()
+
+    def _get_drainining_queue(self, room_id):
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+
+        try:
+            while True:
+                yield queue.popleft()
+        except IndexError:
+            # Queue has been drained.
+            pass
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
+        self._clock = hs.get_clock()
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
 
-    @defer.inlineCallbacks
+        self._event_persist_queue = _EventPeristenceQueue()
+
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
             backfilled: ?
+        """
+        partitioned = {}
+        for event, ctx in events_and_contexts:
+            partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+        deferreds = []
+        for room_id, evs_ctxs in partitioned.items():
+            d = self._event_persist_queue.add_to_queue(
+                room_id, evs_ctxs,
+                backfilled=backfilled,
+                current_state=None,
+            )
+            deferreds.append(d)
 
-        Returns: Tuple of stream_orderings where the first is the minimum and
-            last is the maximum stream ordering assigned to the events when
-            persisting.
+        for room_id in partitioned.keys():
+            self._maybe_start_persisting(room_id)
 
-        """
+        return defer.gatherResults(deferreds, consumeErrors=True)
+
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, context, current_state=None, backfilled=False):
+        deferred = self._event_persist_queue.add_to_queue(
+            event.room_id, [(event, context)],
+            backfilled=backfilled,
+            current_state=current_state,
+        )
+
+        self._maybe_start_persisting(event.room_id)
+
+        yield deferred
+
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+        defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+    def _maybe_start_persisting(self, room_id):
+        @defer.inlineCallbacks
+        def persisting_queue(item):
+            if item.current_state:
+                for event, context in item.events_and_contexts:
+                    # There should only ever be one item in
+                    # events_and_contexts when current_state is
+                    # not None
+                    yield self._persist_event(
+                        event, context,
+                        current_state=item.current_state,
+                        backfilled=item.backfilled,
+                    )
+            else:
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
+
+        self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+    @defer.inlineCallbacks
+    def _persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
@@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, current_state=None, backfilled=False):
-
+    def _persist_event(self, event, context, current_state=None, backfilled=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_current_token()
-        defer.returnValue((stream_ordering, max_persisted_id))
-
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
@@ -427,6 +566,7 @@ class EventsStore(SQLBaseStore):
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
+                    "received_ts": self._clock.time_msec(),
                 }
                 for event, _ in events_and_contexts
             ],
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 11feb3eb11..d9afd7ec87 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore):
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
+
+    @defer.inlineCallbacks
+    def get_throttle_params_by_room(self, pusher_id):
+        res = yield self._simple_select_list(
+            "pusher_throttle",
+            {"pusher": pusher_id},
+            ["room_id", "last_sent_ts", "throttle_ms"],
+            desc="get_throttle_params_by_room"
+        )
+
+        params_by_room = {}
+        for row in res:
+            params_by_room[row["room_id"]] = {
+                "last_sent_ts": row["last_sent_ts"],
+                "throttle_ms": row["throttle_ms"]
+            }
+
+        defer.returnValue(params_by_room)
+
+    @defer.inlineCallbacks
+    def set_throttle_params(self, pusher_id, room_id, params):
+        yield self._simple_upsert(
+            "pusher_throttle",
+            {"pusher": pusher_id, "room_id": room_id},
+            params,
+            desc="set_throttle_params"
+        )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index cd1b611a0c..94be820f86 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -375,7 +375,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
             make_guest,
             appservice_id
         )
+        self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
 
     def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
                 (next_id, user_id, token,)
             )
 
+    @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
             table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
         }, {
             'password_hash': password_hash
         })
+        self.get_user_by_id.invalidate((user_id,))
 
     @defer.inlineCallbacks
     def user_delete_access_tokens(self, user_id, except_token_ids=[]):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..9d6bfd5245 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 from synapse.api.constants import Membership
-from synapse.types import UserID
+from synapse.types import get_domian_from_id
 
 import logging
 
@@ -273,10 +273,7 @@ class RoomMemberStore(SQLBaseStore):
             room_id, membership=Membership.JOIN
         )
 
-        joined_domains = set(
-            UserID.from_string(r["user_id"]).domain
-            for r in rows
-        )
+        joined_domains = set(get_domian_from_id(r["user_id"]) for r in rows)
 
         return joined_domains
 
diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+    pusher BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    last_sent_ts BIGINT,
+    throttle_ms BIGINT,
+    PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;