diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 18 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 8 | ||||
-rw-r--r-- | synapse/storage/deviceinbox.py | 26 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 41 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/presence.py | 7 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/device_federation_stream_idx.sql | 16 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/event_push_index.sql | 17 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/federation_out_position.sql | 22 | ||||
-rw-r--r-- | synapse/storage/stream.py | 47 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 47 |
12 files changed, 224 insertions, 39 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d828d6ee1d..b62c459d8b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -561,12 +561,17 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + if keyvalues: + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" + "SELECT %(retcol)s FROM %(table)s %(where)s" ) % { "retcol": retcol, "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + "where": where, } txn.execute(sql, keyvalues.values()) @@ -744,10 +749,15 @@ class SQLBaseStore(object): @staticmethod def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - update_sql = "UPDATE %s SET %s WHERE %s" % ( + if keyvalues: + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + where, ) txn.execute( diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 3d5994a580..514570561f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -39,6 +39,14 @@ class ApplicationServiceStore(SQLBaseStore): def get_app_services(self): return self.services_cache + def get_if_app_services_interested_in_user(self, user_id): + """Check if the user is one associated with an app service + """ + for service in self.services_cache: + if service.is_interested_in_user(user_id): + return True + return False + def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9cd923eb93..7de3e8c58c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -39,6 +39,14 @@ class EventPushActionsStore(SQLBaseStore): columns=["user_id", "stream_ordering"], ) + self.register_background_index_update( + "event_push_actions_highlights_index", + index_name="event_push_actions_highlights_index", + table="event_push_actions", + columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], + where_clause="highlight=1" + ) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -88,8 +96,11 @@ class EventPushActionsStore(SQLBaseStore): topological_ordering, stream_ordering ) + # First get number of notifications. + # We don't need to put a notif=1 clause as all rows always have + # notif=1 sql = ( - "SELECT sum(notif), sum(highlight)" + "SELECT count(*)" " FROM event_push_actions ea" " WHERE" " user_id = ?" @@ -99,13 +110,27 @@ class EventPushActionsStore(SQLBaseStore): txn.execute(sql, (user_id, room_id)) row = txn.fetchone() - if row: - return { - "notify_count": row[0] or 0, - "highlight_count": row[1] or 0, - } - else: - return {"notify_count": 0, "highlight_count": 0} + notify_count = row[0] if row else 0 + + # Now get the number of highlights + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " highlight = 1" + " AND user_id = ?" + " AND room_id = ?" + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + highlight_count = row[0] if row else 0 + + return { + "notify_count": notify_count, + "highlight_count": highlight_count, + } ret = yield self.runInteraction( "get_unread_event_push_actions_by_room", diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6576a30098..e46ae6502e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 38 +SCHEMA_VERSION = 39 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 21d0696640..7460f98a1f 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState", status_msg (str): User set status message. """ + def as_dict(self): + return dict(self._asdict()) + + @staticmethod + def from_dict(d): + return UserPresenceState(**d) + def copy_and_replace(self, **kwargs): return self._replace(**kwargs) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 49721656b6..cbec255966 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -156,12 +156,20 @@ class PushRuleStore(SQLBaseStore): event=event, ) - local_users_in_room = set(u for u in users_in_room if self.hs.is_mine_id(u)) + # We ignore app service users for now. This is so that we don't fill + # up the `get_if_users_have_pushers` cache with AS entries that we + # know don't have pushers, nor even read receipts. + local_users_in_room = set( + u for u in users_in_room + if self.hs.is_mine_id(u) + and not self.get_if_app_services_interested_in_user(u) + ) # users in the room who have pushers need to get push rules run because # that's how their pushers work if_users_with_pushers = yield self.get_if_users_have_pushers( - local_users_in_room, on_invalidate=cache_context.invalidate, + local_users_in_room, + on_invalidate=cache_context.invalidate, ) user_ids = set( uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql new file mode 100644 index 0000000000..00be801e90 --- /dev/null +++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); diff --git a/synapse/storage/schema/delta/39/event_push_index.sql b/synapse/storage/schema/delta/39/event_push_index.sql new file mode 100644 index 0000000000..de2ad93e5c --- /dev/null +++ b/synapse/storage/schema/delta/39/event_push_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('event_push_actions_highlights_index', '{}'); diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql new file mode 100644 index 0000000000..5af814290b --- /dev/null +++ b/synapse/storage/schema/delta/39/federation_out_position.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE TABLE federation_stream_position( + type TEXT NOT NULL, + stream_id INTEGER NOT NULL + ); + + INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); + INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events; diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..7fa63b58a7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) + + def get_federation_out_pos(self, typ): + return self._simple_select_one_onecol( + table="federation_stream_position", + retcol="stream_id", + keyvalues={"type": typ}, + desc="get_federation_out_pos" + ) + + def update_federation_out_pos(self, typ, stream_id): + return self._simple_update_one( + table="federation_stream_position", + keyvalues={"type": typ}, + updatevalues={"stream_id": stream_id}, + desc="update_federation_out_pos", + ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..809fdd311f 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. |