summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py68
-rw-r--r--synapse/storage/_base.py10
-rw-r--r--synapse/storage/account_data.py50
-rw-r--r--synapse/storage/appservice.py126
-rw-r--r--synapse/storage/client_ips.py68
-rw-r--r--synapse/storage/event_push_actions.py146
-rw-r--r--synapse/storage/events.py456
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/push_rule.py67
-rw-r--r--synapse/storage/pusher.py72
-rw-r--r--synapse/storage/receipts.py64
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/room.py46
-rw-r--r--synapse/storage/roommember.py45
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/schema/delta/32/events.sql16
-rw-r--r--synapse/storage/schema/delta/32/pusher_throttle.sql23
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/storage/search.py29
-rw-r--r--synapse/storage/signatures.py25
-rw-r--r--synapse/storage/stream.py34
21 files changed, 881 insertions, 510 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py

index d970fde9e8..e93c3de66c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache +from ._base import LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -45,6 +45,7 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore from .openid import OpenIdStore +from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -58,12 +59,6 @@ import logging logger = logging.getLogger(__name__) -# Number of msec of granularity to store the user IP 'last seen' time. Smaller -# times give more inserts into the database even for readonly API hits -# 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 - - class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, @@ -84,17 +79,14 @@ class DataStore(RoomMemberStore, RoomStore, AccountDataStore, EventPushActionsStore, OpenIdStore, + ClientIpStore, ): def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine - self.client_ip_last_seen = Cache( - name="client_ip_last_seen", - keylen=4, - ) - self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -148,7 +140,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -173,11 +165,24 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = LoggingTransaction( + db_conn.cursor(), + name="_find_stream_orderings_for_times_txn", + database_engine=self.database_engine, + after_callbacks=[] + ) + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn): @@ -203,39 +208,6 @@ class DataStore(RoomMemberStore, RoomStore, return [UserPresenceState(**row) for row in rows] @defer.inlineCallbacks - def insert_client_ip(self, user, access_token, ip, user_agent): - now = int(self._clock.time_msec()) - key = (user.to_string(), access_token, ip) - - try: - last_seen = self.client_ip_last_seen.get(key) - except KeyError: - last_seen = None - - # Rate-limited inserts - if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) - - self.client_ip_last_seen.prefill(key, now) - - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, - ) - - @defer.inlineCallbacks def count_daily_users(self): """ Counts the number of users who used this homeserver in the last 24 hours. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs - self._db_pool = hs.get_db_pool() self._clock = hs.get_clock() + self._db_pool = hs.get_db_pool() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 @@ -453,7 +453,9 @@ class SQLBaseStore(object): keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values insertion_values (dict): key/values to use when inserting - Returns: A deferred + Returns: + Deferred(bool): True if a new entry was created, False if an + existing one was updated. """ return self.runInteraction( desc, @@ -498,6 +500,10 @@ class SQLBaseStore(object): ) txn.execute(sql, allvalues.values()) + return True + else: + return False + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks + import ujson as json import logging @@ -24,6 +26,7 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): + @cached() def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. @@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_user", get_account_data_for_user_txn ) + @cachedInlineCallbacks(num_args=2) + def get_global_account_data_by_type_for_user(self, data_type, user_id): + """ + Returns: + Deferred: A dict + """ + result = yield self._simple_select_one_onecol( + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": data_type, + }, + retcol="content", + desc="get_global_account_data_by_type_for_user", + allow_none=True, + ) + + if result: + defer.returnValue(json.loads(result)) + else: + defer.returnValue(None) + + @cachedList(cached_method_name="get_global_account_data_by_type_for_user", + num_args=2, list_name="user_ids", inlineCallbacks=True) + def get_global_account_data_by_type_for_users(self, data_type, user_ids): + rows = yield self._simple_select_many_batch( + table="account_data", + column="user_id", + iterable=user_ids, + keyvalues={ + "account_data_type": data_type, + }, + retcols=("user_id", "content",), + desc="get_global_account_data_by_type_for_users", + ) + + defer.returnValue({ + row["user_id"]: json.loads(row["content"]) if row["content"] else None + for row in rows + }) + def get_account_data_for_room(self, user_id, room_id): """Get all the client account_data for a user for a room. @@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: @@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) + txn.call_after( + self.get_global_account_data_by_type_for_user.invalidate, + (account_data_type, user_id,) + ) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 371600eebb..d1ee533fac 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py
@@ -13,16 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import urllib -import yaml import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.appservice import ApplicationService, AppServiceTransaction -from synapse.config._base import ConfigError +from synapse.appservice import AppServiceTransaction +from synapse.config.appservice import load_appservices from synapse.storage.roommember import RoomsForUser -from synapse.types import UserID from ._base import SQLBaseStore @@ -34,7 +31,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.hostname = hs.hostname - self.services_cache = ApplicationServiceStore.load_appservices( + self.services_cache = load_appservices( hs.hostname, hs.config.app_service_config_files ) @@ -144,102 +141,6 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @classmethod - def _load_appservice(cls, hostname, as_info, config_filename): - required_string_fields = [ - "id", "url", "as_token", "hs_token", "sender_localpart" - ] - for field in required_string_fields: - if not isinstance(as_info.get(field), basestring): - raise KeyError("Required string field: '%s' (%s)" % ( - field, config_filename, - )) - - localpart = as_info["sender_localpart"] - if urllib.quote(localpart) != localpart: - raise ValueError( - "sender_localpart needs characters which are not URL encoded." - ) - user = UserID(localpart, hostname) - user_id = user.to_string() - - # namespace checks - if not isinstance(as_info.get("namespaces"), dict): - raise KeyError("Requires 'namespaces' object.") - for ns in ApplicationService.NS_LIST: - # specific namespaces are optional - if ns in as_info["namespaces"]: - # expect a list of dicts with exclusive and regex keys - for regex_obj in as_info["namespaces"][ns]: - if not isinstance(regex_obj, dict): - raise ValueError( - "Expected namespace entry in %s to be an object," - " but got %s", ns, regex_obj - ) - if not isinstance(regex_obj.get("regex"), basestring): - raise ValueError( - "Missing/bad type 'regex' key in %s", regex_obj - ) - if not isinstance(regex_obj.get("exclusive"), bool): - raise ValueError( - "Missing/bad type 'exclusive' key in %s", regex_obj - ) - return ApplicationService( - token=as_info["as_token"], - url=as_info["url"], - namespaces=as_info["namespaces"], - hs_token=as_info["hs_token"], - sender=user_id, - id=as_info["id"], - ) - - @classmethod - def load_appservices(cls, hostname, config_files): - """Returns a list of Application Services from the config files.""" - if not isinstance(config_files, list): - logger.warning( - "Expected %s to be a list of AS config files.", config_files - ) - return [] - - # Dicts of value -> filename - seen_as_tokens = {} - seen_ids = {} - - appservices = [] - - for config_file in config_files: - try: - with open(config_file, 'r') as f: - appservice = ApplicationServiceStore._load_appservice( - hostname, yaml.load(f), config_file - ) - if appservice.id in seen_ids: - raise ConfigError( - "Cannot reuse ID across application services: " - "%s (files: %s, %s)" % ( - appservice.id, config_file, seen_ids[appservice.id], - ) - ) - seen_ids[appservice.id] = config_file - if appservice.token in seen_as_tokens: - raise ConfigError( - "Cannot reuse as_token across application services: " - "%s (files: %s, %s)" % ( - appservice.token, - config_file, - seen_as_tokens[appservice.token], - ) - ) - seen_as_tokens[appservice.token] = config_file - logger.info("Loaded application service: %s", appservice) - appservices.append(appservice) - except Exception as e: - logger.error("Failed to load appservice from '%s'", config_file) - logger.exception(e) - raise - return appservices - class ApplicationServiceTransactionStore(SQLBaseStore): @@ -397,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(txn_id=txn_id, as_id=service.id) ) + @defer.inlineCallbacks def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this service. @@ -407,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - return self.runInteraction( + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", self._get_oldest_unsent_txn, service ) + if not entry: + defer.returnValue(None) + + event_ids = json.loads(entry["event_ids"]) + + events = yield self._get_events(event_ids) + + defer.returnValue(AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + )) + def _get_oldest_unsent_txn(self, txn, service): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) @@ -427,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): entry = rows[0] - event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(txn, event_ids) - - return AppServiceTransaction( - service=service, id=entry["txn_id"], events=events - ) + return entry def _get_last_txn(self, txn, service_id): txn.execute( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py new file mode 100644
index 0000000000..a90990e006 --- /dev/null +++ b/synapse/storage/client_ips.py
@@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore, Cache + +from twisted.internet import defer + + +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120 * 1000 + + +class ClientIpStore(SQLBaseStore): + + def __init__(self, hs): + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + ) + + super(ClientIpStore, self).__init__(hs) + + @defer.inlineCallbacks + def insert_client_ip(self, user, access_token, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, ip) + + try: + last_seen = self.client_ip_last_seen.get(key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) + + self.client_ip_last_seen.prefill(key, now) + + # It's safe not to lock here: a) no unique constraint, + # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely + yield self._simple_upsert( + "user_ips", + keyvalues={ + "user_id": user.to_string(), + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + }, + values={ + "last_seen": now, + }, + desc="insert_client_ip", + lock=False, + ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..940e11d7a2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -115,19 +119,23 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None): + max_stream_ordering=None, + limit=20): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " + "FROM (" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl " - "WHERE" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" " ep.room_id = rl.room_id" " AND (" " ep.topological_ordering > rl.topological_ordering" @@ -144,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore): if max_stream_ordering is not None: sql += " AND ep.stream_ordering <= ?" args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering ASC" + sql += " ORDER BY ep.stream_ordering ASC LIMIT ?" + args.append(limit) txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( @@ -153,11 +162,13 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep " - "WHERE ep.room_id not in (" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ? " + " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" ") AND ep.user_id = ? AND ep.stream_ordering > ?" ) @@ -175,12 +186,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.stream_ordering > ?" + " ORDER BY ep.stream_ordering ASC" + " LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") @@ -201,6 +230,93 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + """ + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id, ) + ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. + txn.execute( + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < ?", + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + ) + + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago + ) + + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..6d978ffcd5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -19,12 +19,17 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + +import synapse +import synapse.metrics + import logging import math @@ -33,6 +38,10 @@ import ujson as json logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) +persist_event_counter = metrics.register_counter("persisted_events") + + def encode_json(json_object): if USE_FROZEN_DICTS: # ujson doesn't like frozen_dicts @@ -50,28 +59,172 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, per_item_callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.setdefault(room_id, deque()) + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + +_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) - """ + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -115,11 +268,11 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, ) + persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -133,12 +286,10 @@ class EventsStore(SQLBaseStore): current_state=current_state, backfilled=backfilled, ) + persist_event_counter.inc() except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, @@ -203,9 +354,6 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) @@ -427,6 +575,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], @@ -495,6 +644,8 @@ class EventsStore(SQLBaseStore): ], ) + self._add_to_cache(txn, events_and_contexts) + if backfilled: # Backfilled events come before the current state so we don't need # to update the current state table @@ -536,6 +687,45 @@ class EventsStore(SQLBaseStore): return + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] + + rows = [] + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(ev_map)),) + + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) + + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) @@ -601,100 +791,65 @@ class EventsStore(SQLBaseStore): event_id_list = event_ids event_ids = set(event_ids) - event_map = self._get_events_from_cache( + event_entry_map = self._get_events_from_cache( event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - missing_events_ids = [e for e in event_ids if e not in event_map] + missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - event_map.update(missing_events) - - defer.returnValue([ - event_map[e_id] for e_id in event_id_list - if e_id in event_map and event_map[e_id] - ]) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - return [] + event_entry_map.update(missing_events) - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + events = [] + for event_id in event_id_list: + entry = event_entry_map.get(event_id, None) + if not entry: + continue - missing_events_ids = [e for e in event_ids if e not in event_map] + if allow_rejected or not entry.event.rejected_reason: + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event - if not missing_events_ids: - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] + events.append(event) - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender - event_map.update(missing_events) - - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] + defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate( - (event_id, check_redacted, get_prev_content) - ) + self._get_event_cache.invalidate((event_id,)) - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None - - def _get_events_from_cache(self, events, check_redacted, get_prev_content, - allow_rejected): + def _get_events_from_cache(self, events, allow_rejected): event_map = {} for event_id in events: - try: - ret = self._get_event_cache.get( - (event_id, check_redacted, get_prev_content,) - ) + ret = self._get_event_cache.get((event_id,), None) + if not ret: + continue - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass + if allow_rejected or not ret.event.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None return event_map @@ -765,8 +920,7 @@ class EventsStore(SQLBaseStore): reactor.callFromThread(fire, event_list) @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): """Fetches events from the database using the _event_fetch_list. This allows batch and bulk fetching of events - it allows us to fetch events without having to create a new transaction for each request for events. @@ -804,8 +958,6 @@ class EventsStore(SQLBaseStore): [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, rejected_reason=row["rejects"], ) for row in rows @@ -814,7 +966,7 @@ class EventsStore(SQLBaseStore): ) defer.returnValue({ - e.event_id: e + e.event.event_id: e for e in res if e }) @@ -844,37 +996,8 @@ class EventsStore(SQLBaseStore): return rows - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return {} - - rows = self._fetch_event_rows( - txn, events, - ) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = [ - self._get_event_from_row_txn( - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ] - - return { - r.event_id: r - for r in res - } - @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -884,26 +1007,27 @@ class EventsStore(SQLBaseStore): table="rejections", keyvalues={"event_id": rejected_reason}, retcol="reason", - desc="_get_event_from_row", + desc="_get_event_from_row_rejected_reason", ) - ev = FrozenEvent( + original_ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) - if check_redacted and redacted: - ev = prune_event(ev) + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) redaction_id = yield self._simple_select_one_onecol( table="redactions", - keyvalues={"redacts": ev.event_id}, + keyvalues={"redacts": redacted_event.event_id}, retcol="event_id", - desc="_get_event_from_row", + desc="_get_event_from_row_redactions", ) - ev.unsigned["redacted_by"] = redaction_id + redacted_event.unsigned["redacted_by"] = redaction_id # Get the redaction event. because = yield self.get_event( @@ -915,86 +1039,16 @@ class EventsStore(SQLBaseStore): if because: # It's fine to do add the event directly, since get_pdu_json # will serialise this field correctly - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) - - defer.returnValue(ev) - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) + redacted_event.unsigned["redacted_because"] = because - if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, ) - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = self._simple_select_one_onecol_txn( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = self._get_event_txn( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) - - return ev - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - return self._get_events_txn(txn, event_ids) + defer.returnValue(cache_entry) @defer.inlineCallbacks def count_daily_messages(self): diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 07f5fae8dd..3fab57a7e8 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py
@@ -149,6 +149,7 @@ class PresenceStore(SQLBaseStore): "status_msg", "currently_active", ), + desc="get_presence_for_users", ) for row in rows: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d2bf7f2aec..786d6f6d67 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py
@@ -14,7 +14,8 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.push.baserules import list_with_base_rules from twisted.internet import defer import logging @@ -23,8 +24,31 @@ import simplejson as json logger = logging.getLogger(__name__) +def _load_rules(rawrules, enabled_map): + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -42,9 +66,13 @@ class PushRuleStore(SQLBaseStore): key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) ) - defer.returnValue(rows) + enabled_map = yield self.get_push_rules_enabled_for_user(user_id) - @cachedInlineCallbacks() + rules = _load_rules(rows, enabled_map) + + defer.returnValue(rules) + + @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", @@ -60,12 +88,16 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules", @@ -75,18 +107,32 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules", ) - rows.sort(key=lambda e: (-e["priority_class"], -e["priority"])) + rows.sort( + key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) + ) for row in rows: results.setdefault(row['user_name'], []).append(row) + + enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) + + for user_id, rules in results.items(): + results[user_id] = _load_rules( + rules, enabled_map_by_user.get(user_id, {}) + ) + defer.returnValue(results) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_enabled_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: {} + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules_enable", @@ -96,7 +142,8 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules_enabled", ) for row in rows: - results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + enabled = bool(row['enabled']) + results.setdefault(row['user_name'], {})[row['rule_id']] = enabled defer.returnValue(results) @defer.inlineCallbacks diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 11feb3eb11..a7d7c54d7e 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList import logging import simplejson as json @@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) - @cachedInlineCallbacks(num_args=1) - def get_users_with_pushers_in_room(self, room_id): - users = yield self.get_users_in_room(room_id) - + @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000) + def get_if_user_has_pusher(self, user_id): result = yield self._simple_select_many_batch( table='pushers', + keyvalues={ + 'user_name': 'user_id', + }, + retcol='user_name', + desc='get_if_user_has_pusher', + allow_none=True, + ) + + defer.returnValue(bool(result)) + + @cachedList(cached_method_name="get_if_user_has_pusher", + list_name="user_ids", num_args=1, inlineCallbacks=True) + def get_if_users_have_pushers(self, user_ids): + rows = yield self._simple_select_many_batch( + table='pushers', column='user_name', - iterable=users, + iterable=user_ids, retcols=['user_name'], - desc='get_users_with_pushers_in_room' + desc='get_if_users_have_pushers' ) - defer.returnValue([r['user_name'] for r in result]) + result = {user_id: False for user_id in user_ids} + result.update({r['user_name']: True for r in rows}) + + defer.returnValue(result) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -156,8 +172,7 @@ class PusherStore(SQLBaseStore): profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: def f(txn): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) - return self._simple_upsert_txn( + newly_inserted = self._simple_upsert_txn( txn, "pushers", { @@ -178,11 +193,18 @@ class PusherStore(SQLBaseStore): "id": stream_id, }, ) - defer.returnValue((yield self.runInteraction("add_pusher", f))) + if newly_inserted: + # get_if_user_has_pusher only cares if the user has + # at least *one* pusher. + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + + yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + self._simple_delete_one_txn( txn, "pushers", @@ -194,6 +216,7 @@ class PusherStore(SQLBaseStore): {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, {"stream_id": stream_id}, ) + with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( "delete_pusher", delete_pusher_txn, stream_id @@ -233,3 +256,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 935fc503d9..8c26f39fbb 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore): "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() ) + @cachedInlineCallbacks() + def get_users_with_read_receipts_in_room(self, room_id): + receipts = yield self.get_receipts_for_room(room_id, "m.read") + defer.returnValue(set(r['user_id'] for r in receipts)) + + def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type, + user_id): + if receipt_type != "m.read": + return + + # Returns an ObservableDeferred + res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) + + if res and res.called and user_id in res.result: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return + + self.get_users_with_read_receipts_in_room.invalidate((room_id,)) + @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): return self._simple_select_list( @@ -100,7 +120,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000) + @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -229,10 +249,14 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -244,6 +268,17 @@ class ReceiptsStore(SQLBaseStore): (user_id, room_id, receipt_type) ) + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + topological_ordering = int(res["topological_ordering"]) if res else None + stream_ordering = int(res["stream_ordering"]) if res else None + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -255,16 +290,7 @@ class ReceiptsStore(SQLBaseStore): txn.execute(sql, (room_id, receipt_type, user_id)) results = txn.fetchall() - if results: - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - ) - topological_ordering = int(res["topological_ordering"]) - stream_ordering = int(res["stream_ordering"]) - + if results and topological_ordering: for to, so, _ in results: if int(to) > topological_ordering: return False @@ -294,6 +320,14 @@ class ReceiptsStore(SQLBaseStore): } ) + if receipt_type == "m.read" and topological_ordering: + self._remove_old_push_actions_before_txn( + txn, + room_id=room_id, + user_id=user_id, + topological_ordering=topological_ordering, + ) + return True @defer.inlineCallbacks @@ -364,10 +398,14 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 26933e593a..97f9f1929c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py
@@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore): @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): - def f(txn): + def get_room_name(txn): sql = ( - "SELECT event_id FROM current_state_events " - "WHERE room_id = ? " + "SELECT name FROM room_names" + " INNER JOIN current_state_events USING (room_id, event_id)" + " WHERE room_id = ?" + " LIMIT 1" ) - sql += " AND ((type = 'm.room.name' AND state_key = '')" - sql += " OR type = 'm.room.aliases')" - txn.execute(sql, (room_id,)) - results = self.cursor_to_dict(txn) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + return None - return self._parse_events_txn(txn, results) + return [row[0] for row in txn.fetchall()] - events = yield self.runInteraction("get_room_name_and_aliases", f) + def get_room_aliases(txn): + sql = ( + "SELECT content FROM current_state_events" + " INNER JOIN events USING (room_id, event_id)" + " WHERE room_id = ?" + ) + txn.execute(sql, (room_id,)) + return [row[0] for row in txn.fetchall()] + + name = yield self.runInteraction("get_room_name", get_room_name) + alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases) - name = None aliases = [] - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) + for c in alias_contents: + try: + content = json.loads(c) + except: + continue + + aliases.extend(content.get('aliases', [])) defer.returnValue((name, aliases)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..64b4bd371b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import Membership -from synapse.types import UserID +from synapse.types import get_domain_from_id import logging @@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) - txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering ) @@ -137,24 +134,6 @@ class RoomMemberStore(SQLBaseStore): return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) - def get_room_members(self, room_id, membership=None): - """Retrieve the current room member list for a room. - - Args: - room_id (str): The room to get the list of members. - membership (synapse.api.constants.Membership): The filter to apply - to this list, or None to return all members with some state - associated with this room. - Returns: - list of namedtuples representing the members in this room. - """ - return self.runInteraction( - "get_room_members", - self._get_members_events_txn, - room_id, - membership=membership, - ).addCallback(self._get_events) - @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to @@ -259,26 +238,10 @@ class RoomMemberStore(SQLBaseStore): return results - @cached(max_entries=5000) + @cachedInlineCallbacks(max_entries=5000) def get_joined_hosts_for_room(self, room_id): - return self.runInteraction( - "get_joined_hosts_for_room", - self._get_joined_hosts_for_room_txn, - room_id, - ) - - def _get_joined_hosts_for_room_txn(self, txn, room_id): - rows = self._get_members_rows_txn( - txn, - room_id, membership=Membership.JOIN - ) - - joined_domains = set( - UserID.from_string(r["user_id"]).domain - for r in rows - ) - - return joined_domains + user_ids = yield self.get_users_in_room(room_id) + defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids)) def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): rows = self._get_members_rows_txn( diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.storage.appservice import ApplicationServiceStore +from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = ApplicationServiceStore.load_appservices( + appservices = load_appservices( config.server_name, config_files ) diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql new file mode 100644
index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql new file mode 100644
index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql new file mode 100644
index 0000000000..f859be46a6 --- /dev/null +++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- The following indices are redundant, other indices are equivalent or +-- supersets +DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream +DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT + +DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT + +-- The following indices were unused +DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id; +DROP INDEX IF EXISTS evauth_edges_auth_id; +DROP INDEX IF EXISTS presence_stream_state; diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 0224299625..12941d1775 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging import re +import ujson as json logger = logging.getLogger(__name__) @@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id FROM events" + "SELECT stream_ordering, event_id, room_id, type, content FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetchall() + rows = self.cursor_to_dict(txn) if not rows: return 0 - min_stream_id = rows[-1][0] - event_ids = [row[1] for row in rows] - - events = self._get_events_txn(txn, event_ids) + min_stream_id = rows[-1]["stream_ordering"] event_search_rows = [] - for event in events: + for row in rows: try: - event_id = event.event_id - room_id = event.room_id - content = event.content - if event.type == "m.room.message": + event_id = row["event_id"] + room_id = row["room_id"] + etype = row["type"] + try: + content = json.loads(row["content"]) + except: + continue + + if etype == "m.room.message": key = "content.body" value = content["body"] - elif event.type == "m.room.topic": + elif etype == "m.room.topic": key = "content.topic" value = content["topic"] - elif event.type == "m.room.name": + elif etype == "m.room.name": key = "content.name" value = content["name"] except (KeyError, AttributeError): diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash +from synapse.util.caches.descriptors import cached, cachedList class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" + @cached(lru=True) + def get_event_reference_hash(self, event_id): + return self._get_event_reference_hashes_txn(event_id) + + @cachedList(cached_method_name="get_event_reference_hash", + list_name="event_ids", num_args=1) def get_event_reference_hashes(self, event_ids): def f(txn): - return [ - self._get_event_reference_hashes_txn(txn, ev) - for ev in event_ids - ] + return { + event_id: self._get_event_reference_hashes_txn(txn, event_id) + for event_id in event_ids + } return self.runInteraction( "get_event_reference_hashes", @@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore): hashes = yield self.get_event_reference_hashes( event_ids ) - hashes = [ - { + hashes = { + e_id: { k: encode_base64(v) for k, v in h.items() if k == "sha256" } - for h in hashes - ] + for e_id, h in hashes.items() + } - defer.returnValue(zip(event_ids, hashes)) + defer.returnValue(hashes.items()) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 95b12559a6..b9ad965fd6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py
@@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore): return True return False - ret = self._get_events_txn( - txn, - # apply the filter on the room id list - [ - r["event_id"] for r in rows - if app_service_interested(r) - ], - get_prev_content=True - ) + return [r for r in rows if app_service_interested(r)] - self._set_before_and_after(ret, rows) + rows = yield self.runInteraction("get_appservice_room_stream", f) - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return ret, key + self._set_before_and_after(ret, rows, topo_order=from_id is None) - results = yield self.runInteraction("get_appservice_room_stream", f) - defer.returnValue(results) + if rows: + key = "s%d" % max(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,