summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorDavid Baker <dbkr@users.noreply.github.com>2016-04-11 12:58:55 +0100
committerDavid Baker <dbkr@users.noreply.github.com>2016-04-11 12:58:55 +0100
commit2547dffccc2f2ead7e0d35dafd6da5d468e6d1d8 (patch)
tree76c9b510bdb25556553c0cbfa7072e36d11f7e75 /synapse/storage
parentPEP8 (diff)
parentRun unsafe proces in a loop until we've caught up (diff)
downloadsynapse-2547dffccc2f2ead7e0d35dafd6da5d468e6d1d8.tar.xz
Merge pull request #705 from matrix-org/dbkr/pushers_use_event_actions
Change pushers to use the event_actions table
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_push_actions.py48
-rw-r--r--synapse/storage/events.py14
-rw-r--r--synapse/storage/pusher.py86
-rw-r--r--synapse/storage/receipts.py9
-rw-r--r--synapse/storage/registration.py22
-rw-r--r--synapse/storage/roommember.py3
-rw-r--r--synapse/storage/schema/delta/31/pushers.py79
7 files changed, 209 insertions, 52 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3933b6e2c5..355478957d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
         )
         defer.returnValue(ret)
 
+    @defer.inlineCallbacks
+    def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+                " stream_ordering >= ? AND stream_ordering <= ?"
+            )
+            txn.execute(sql, (min_stream_ordering, max_stream_ordering))
+            return [r[0] for r in txn.fetchall()]
+        ret = yield self.runInteraction("get_push_action_users_in_range", f)
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_unread_push_actions_for_user_in_range(self, user_id,
+                                                  min_stream_ordering,
+                                                  max_stream_ordering=None):
+        def f(txn):
+            sql = (
+                "SELECT event_id, stream_ordering, actions"
+                " FROM event_push_actions"
+                " WHERE user_id = ? AND stream_ordering > ?"
+            )
+            args = [user_id, min_stream_ordering]
+            if max_stream_ordering is not None:
+                sql += " AND stream_ordering <= ?"
+                args.append(max_stream_ordering)
+            sql += " ORDER BY stream_ordering ASC"
+            txn.execute(sql, args)
+            return txn.fetchall()
+        ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
+        defer.returnValue([
+            {
+                "event_id": row[0],
+                "stream_ordering": row[1],
+                "actions": json.loads(row[2]),
+            } for row in ret
+        ])
+
+    @defer.inlineCallbacks
+    def get_latest_push_action_stream_ordering(self):
+        def f(txn):
+            txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
+            return txn.fetchone()
+        result = yield self.runInteraction(
+            "get_latest_push_action_stream_ordering", f
+        )
+        defer.returnValue(result[0] or 0)
+
     def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
         # Sad that we have to blow away the cache for the whole room here
         txn.call_after(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ee87a71719..308a2c9b02 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
+        """
+        Write events to the database
+        Args:
+            events_and_contexts: list of tuples of (event, context)
+            backfilled: ?
+
+        Returns: Tuple of stream_orderings where the first is the minimum and
+            last is the maximum stream ordering assigned to the events when
+            persisting.
+
+        """
         if not events_and_contexts:
             return
 
@@ -191,6 +202,9 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self._get_current_state_for_key.invalidate_all)
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+            txn.call_after(
+                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
+            )
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d1669c778a..19888a8e76 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
 import logging
 import simplejson as json
 import types
@@ -48,6 +50,13 @@ class PusherStore(SQLBaseStore):
         return rows
 
     @defer.inlineCallbacks
+    def user_has_pusher(self, user_id):
+        ret = yield self._simple_select_one_onecol(
+            "pushers", {"user_name": user_id}, "id", allow_none=True
+        )
+        defer.returnValue(ret is not None)
+
+    @defer.inlineCallbacks
     def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
         def r(txn):
             sql = (
@@ -107,31 +116,46 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    @cachedInlineCallbacks(num_args=1)
+    def get_users_with_pushers_in_room(self, room_id):
+        users = yield self.get_users_in_room(room_id)
+
+        result = yield self._simple_select_many_batch(
+            'pushers', 'user_name', users, ['user_name']
+        )
+
+        defer.returnValue([r['user_name'] for r in result])
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data, profile_tag=""):
+                   pushkey, pushkey_ts, lang, data, last_stream_ordering,
+                   profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            yield self._simple_upsert(
-                "pushers",
-                dict(
-                    app_id=app_id,
-                    pushkey=pushkey,
-                    user_name=user_id,
-                ),
-                dict(
-                    access_token=access_token,
-                    kind=kind,
-                    app_display_name=app_display_name,
-                    device_display_name=device_display_name,
-                    ts=pushkey_ts,
-                    lang=lang,
-                    data=encode_canonical_json(data),
-                    profile_tag=profile_tag,
-                    id=stream_id,
-                ),
-                desc="add_pusher",
-            )
+            def f(txn):
+                txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+                return self._simple_upsert_txn(
+                    txn,
+                    "pushers",
+                    {
+                        "app_id": app_id,
+                        "pushkey": pushkey,
+                        "user_name": user_id,
+                    },
+                    {
+                        "access_token": access_token,
+                        "kind": kind,
+                        "app_display_name": app_display_name,
+                        "device_display_name": device_display_name,
+                        "ts": pushkey_ts,
+                        "lang": lang,
+                        "data": encode_canonical_json(data),
+                        "last_stream_ordering": last_stream_ordering,
+                        "profile_tag": profile_tag,
+                        "id": stream_id,
+                    },
+                )
+        defer.returnValue((yield self.runInteraction("add_pusher", f)))
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -153,22 +177,28 @@ class PusherStore(SQLBaseStore):
             )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+    def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+                                           last_stream_ordering):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token},
-            desc="update_pusher_last_token",
+            {'last_stream_ordering': last_stream_ordering},
+            desc="update_pusher_last_stream_ordering",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
-                                             last_token, last_success):
+    def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+                                                       user_id,
+                                                       last_stream_ordering,
+                                                       last_success):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token, 'last_success': last_success},
-            desc="update_pusher_last_token_and_success",
+            {
+                'last_stream_ordering': last_stream_ordering,
+                'last_success': last_success
+            },
+            desc="update_pusher_last_stream_ordering_and_success",
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 7fdd84bbdc..3b8805593e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
-    def get_all_updated_receipts(self, last_id, current_id, limit):
+    def get_all_updated_receipts(self, last_id, current_id, limit=None):
         def get_all_updated_receipts_txn(txn):
             sql = (
                 "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
                 " FROM receipts_linearized"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_id, current_id, limit))
+            args = [last_id, current_id]
+            if limit is not None:
+                sql += " LIMIT ?"
+                args.append(limit)
+            txn.execute(sql, args)
 
             return txn.fetchall()
         return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1f71773aaa..7af0cae6a5 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 
 class RegistrationStore(SQLBaseStore):
@@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore):
 
         defer.returnValue(res if res else False)
 
-    @cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1,
-                inlineCallbacks=True)
-    def are_guests(self, user_ids):
-        sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
-            ",".join("?" for _ in user_ids),
-        )
-
-        rows = yield self._execute(
-            "are_guests", self.cursor_to_dict, sql, *user_ids
-        )
-
-        result = {user_id: False for user_id in user_ids}
-
-        result.update({
-            row["name"]: bool(row["is_guest"])
-            for row in rows
-        })
-
-        defer.returnValue(result)
-
     def _query_for_auth(self, txn, token):
         sql = (
             "SELECT users.name, users.is_guest, access_tokens.id as token_id"
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 77518e893f..08a54cbdd1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -59,6 +59,9 @@ class RoomMemberStore(SQLBaseStore):
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(
+                self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
+            )
+            txn.call_after(
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
new file mode 100644
index 0000000000..93367fa09e
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -0,0 +1,79 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Change the last_token to last_stream_ordering now that pushers no longer
+# listen on an event stream but instead select out of the event_push_actions
+# table.
+
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def token_to_stream_ordering(token):
+    return int(token[1:].split('_')[0])
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    logger.info("Porting pushers table, delta 31...")
+    cur.execute("""
+        CREATE TABLE IF NOT EXISTS pushers2 (
+          id BIGINT PRIMARY KEY,
+          user_name TEXT NOT NULL,
+          access_token BIGINT DEFAULT NULL,
+          profile_tag VARCHAR(32) NOT NULL,
+          kind VARCHAR(8) NOT NULL,
+          app_id VARCHAR(64) NOT NULL,
+          app_display_name VARCHAR(64) NOT NULL,
+          device_display_name VARCHAR(128) NOT NULL,
+          pushkey TEXT NOT NULL,
+          ts BIGINT NOT NULL,
+          lang VARCHAR(8),
+          data TEXT,
+          last_stream_ordering INTEGER,
+          last_success BIGINT,
+          failing_since BIGINT,
+          UNIQUE (app_id, pushkey, user_name)
+        )
+    """)
+    cur.execute("""SELECT
+        id, user_name, access_token, profile_tag, kind,
+        app_id, app_display_name, device_display_name,
+        pushkey, ts, lang, data, last_token, last_success,
+        failing_since
+        FROM pushers
+    """)
+    count = 0
+    for row in cur.fetchall():
+        row = list(row)
+        row[12] = token_to_stream_ordering(row[12])
+        cur.execute(database_engine.convert_param_style("""
+            INSERT into pushers2 (
+            id, user_name, access_token, profile_tag, kind,
+            app_id, app_display_name, device_display_name,
+            pushkey, ts, lang, data, last_stream_ordering, last_success,
+            failing_since
+            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
+            row
+        )
+        count += 1
+    cur.execute("DROP TABLE pushers")
+    cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
+    logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass