diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3933b6e2c5..5f61743e34 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+ " stream_ordering >= ? AND stream_ordering >= ?"
+ )
+ txn.execute(sql, (min_stream_ordering, max_stream_ordering))
+ return [r[0] for r in txn.fetchall()]
+ ret = yield self.runInteraction("get_push_action_users_in_range", f)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_unread_push_actions_for_user_in_range(self, user_id,
+ min_stream_ordering,
+ max_stream_ordering=None):
+ def f(txn):
+ sql = (
+ "SELECT event_id, stream_ordering, actions"
+ " FROM event_push_actions"
+ " WHERE user_id = ? AND stream_ordering > ?"
+ )
+ args = [user_id, min_stream_ordering]
+ if max_stream_ordering is not None:
+ sql += " AND stream_ordering <= ?"
+ args.append(max_stream_ordering)
+ sql += " ORDER BY stream_ordering ASC"
+ txn.execute(sql, args)
+ return txn.fetchall()
+ ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
+ defer.returnValue([
+ {
+ "event_id": row[0],
+ "stream_ordering": row[1],
+ "actions": json.loads(row[2]),
+ } for row in ret
+ ])
+
+ @defer.inlineCallbacks
+ def get_latest_push_action_stream_ordering(self):
+ def f(txn):
+ txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
+ return txn.fetchone()
+ result = yield self.runInteraction(
+ "get_latest_push_action_stream_ordering", f
+ )
+ defer.returnValue(result[0] or 0)
+
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d299a1132..ceae8715ce 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
+ """
+ Write events to the database
+ Args:
+ events_and_contexts: list of tuples of (event, context)
+ backfilled: ?
+
+ Returns: Tuple of stream_orderings where the first is the minimum and
+ last is the maximum stream ordering assigned to the events when
+ persisting.
+
+ """
if not events_and_contexts:
return
@@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d1669c778a..f7886dd1bb 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
import logging
import simplejson as json
import types
@@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
+ @cachedInlineCallbacks(num_args=1)
+ def get_users_with_pushers_in_room(self, room_id):
+ users = yield self.get_users_in_room(room_id)
+
+ result = yield self._simple_select_many_batch(
+ 'pushers', 'user_name', users, ['user_name']
+ )
+
+ defer.returnValue([r['user_name'] for r in result])
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
- pushkey, pushkey_ts, lang, data, profile_tag=""):
- with self._pushers_id_gen.get_next() as stream_id:
- yield self._simple_upsert(
- "pushers",
- dict(
- app_id=app_id,
- pushkey=pushkey,
- user_name=user_id,
- ),
- dict(
- access_token=access_token,
- kind=kind,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- ts=pushkey_ts,
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
+ def f(txn):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ with self._pushers_id_gen.get_next() as stream_id:
+ return self._simple_upsert_txn(
+ txn,
+ "pushers",
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ user_name=user_id,
+ ),
+ dict(
+ access_token=access_token,
+ kind=kind,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ ts=pushkey_ts,
+ lang=lang,
+ data=encode_canonical_json(data),
+ last_stream_ordering=last_stream_ordering,
+ profile_tag=profile_tag,
+ id=stream_id,
+ ),
+ )
+ defer.returnValue((yield self.runInteraction("add_pusher", f)))
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+ def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+ last_stream_ordering):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token},
- desc="update_pusher_last_token",
+ {'last_stream_ordering': last_stream_ordering},
+ desc="update_pusher_last_stream_ordering",
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
- last_token, last_success):
+ def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+ user_id,
+ last_stream_ordering,
+ last_success):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token, 'last_success': last_success},
- desc="update_pusher_last_token_and_success",
+ {
+ 'last_stream_ordering': last_stream_ordering,
+ 'last_success': last_success
+ },
+ desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d46a963bb8..701dd2f656 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore):
defer.returnValue(res if res else False)
- @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1,
- inlineCallbacks=True)
- def are_guests(self, user_ids):
- sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
- ",".join("?" for _ in user_ids),
- )
-
- rows = yield self._execute(
- "are_guests", self.cursor_to_dict, sql, *user_ids
- )
-
- result = {user_id: False for user_id in user_ids}
-
- result.update({
- row["name"]: bool(row["is_guest"])
- for row in rows
- })
-
- defer.returnValue(result)
-
def _query_for_auth(self, txn, token):
sql = (
"SELECT users.name, users.is_guest, access_tokens.id as token_id"
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 66e7a40e3c..22a690aa8d 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,))
txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
new file mode 100644
index 0000000000..7e0e385fb5
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -0,0 +1,75 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Change the last_token to last_stream_ordering now that pushers no longer
+# listen on an event stream but instead select out of the event_push_actions
+# table.
+
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def token_to_stream_ordering(token):
+ return int(token[1:].split('_')[0])
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ logger.info("Porting pushers table, delta 31...")
+ cur.execute("""
+ CREATE TABLE IF NOT EXISTS pushers2 (
+ id BIGINT PRIMARY KEY,
+ user_name TEXT NOT NULL,
+ access_token BIGINT DEFAULT NULL,
+ profile_tag VARCHAR(32) NOT NULL,
+ kind VARCHAR(8) NOT NULL,
+ app_id VARCHAR(64) NOT NULL,
+ app_display_name VARCHAR(64) NOT NULL,
+ device_display_name VARCHAR(128) NOT NULL,
+ pushkey TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ lang VARCHAR(8),
+ data TEXT,
+ last_stream_ordering INTEGER,
+ last_success BIGINT,
+ failing_since BIGINT,
+ UNIQUE (app_id, pushkey, user_name)
+ )
+ """)
+ cur.execute("""SELECT
+ id, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
+ pushkey, ts, lang, data, last_token, last_success,
+ failing_since
+ FROM pushers
+ """)
+ count = 0
+ for row in cur.fetchall():
+ row = list(row)
+ row[12] = token_to_stream_ordering(row[12])
+ cur.execute(database_engine.convert_param_style("""
+ INSERT into pushers2 (
+ id, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
+ pushkey, ts, lang, data, last_stream_ordering, last_success,
+ failing_since
+ ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
+ row
+ )
+ count += 1
+ cur.execute("DROP TABLE pushers")
+ cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
+ logger.info("Moved %d pushers to new table", count)
|