summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py27
-rw-r--r--synapse/storage/_base.py10
-rw-r--r--synapse/storage/account_data.py50
-rw-r--r--synapse/storage/appservice.py105
-rw-r--r--synapse/storage/event_push_actions.py146
-rw-r--r--synapse/storage/events.py175
-rw-r--r--synapse/storage/openid.py32
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/push_rule.py67
-rw-r--r--synapse/storage/pusher.py75
-rw-r--r--synapse/storage/receipts.py67
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/room.py18
-rw-r--r--synapse/storage/roommember.py45
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/schema/delta/32/events.sql16
-rw-r--r--synapse/storage/schema/delta/32/openid.sql9
-rw-r--r--synapse/storage/schema/delta/32/pusher_throttle.sql23
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/storage/schema/delta/32/reports.sql25
-rw-r--r--synapse/storage/signatures.py25
-rw-r--r--synapse/storage/transactions.py165
23 files changed, 857 insertions, 271 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py

index 045ae6c03f..6928a213e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache +from ._base import Cache, LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,11 +82,13 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -114,6 +117,7 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( @@ -145,7 +149,7 @@ class DataStore(RoomMemberStore, RoomStore, "AccountDataAndTagsChangeCache", account_max, ) - self.__presence_on_startup = self._get_active_presence(db_conn) + self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( db_conn, "presence_stream", @@ -170,11 +174,24 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = LoggingTransaction( + db_conn.cursor(), + name="_find_stream_orderings_for_times_txn", + database_engine=self.database_engine, + after_callbacks=[] + ) + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): - active_on_startup = self.__presence_on_startup - self.__presence_on_startup = None + active_on_startup = self._presence_on_startup + self._presence_on_startup = None return active_on_startup def _get_active_presence(self, db_conn): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs - self._db_pool = hs.get_db_pool() self._clock = hs.get_clock() + self._db_pool = hs.get_db_pool() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 @@ -453,7 +453,9 @@ class SQLBaseStore(object): keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values insertion_values (dict): key/values to use when inserting - Returns: A deferred + Returns: + Deferred(bool): True if a new entry was created, False if an + existing one was updated. """ return self.runInteraction( desc, @@ -498,6 +500,10 @@ class SQLBaseStore(object): ) txn.execute(sql, allvalues.values()) + return True + else: + return False + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks + import ujson as json import logging @@ -24,6 +26,7 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): + @cached() def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. @@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_user", get_account_data_for_user_txn ) + @cachedInlineCallbacks(num_args=2) + def get_global_account_data_by_type_for_user(self, data_type, user_id): + """ + Returns: + Deferred: A dict + """ + result = yield self._simple_select_one_onecol( + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": data_type, + }, + retcol="content", + desc="get_global_account_data_by_type_for_user", + allow_none=True, + ) + + if result: + defer.returnValue(json.loads(result)) + else: + defer.returnValue(None) + + @cachedList(cached_method_name="get_global_account_data_by_type_for_user", + num_args=2, list_name="user_ids", inlineCallbacks=True) + def get_global_account_data_by_type_for_users(self, data_type, user_ids): + rows = yield self._simple_select_many_batch( + table="account_data", + column="user_id", + iterable=user_ids, + keyvalues={ + "account_data_type": data_type, + }, + retcols=("user_id", "content",), + desc="get_global_account_data_by_type_for_users", + ) + + defer.returnValue({ + row["user_id"]: json.loads(row["content"]) if row["content"] else None + for row in rows + }) + def get_account_data_for_room(self, user_id, room_id): """Get all the client account_data for a user for a room. @@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: @@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id, ) + txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) + txn.call_after( + self.get_global_account_data_by_type_for_user.invalidate, + (account_data_type, user_id,) + ) self._update_max_stream_id(txn, next_id) with self._account_data_id_gen.get_next() as next_id: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 371600eebb..feb9d228ae 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py
@@ -13,16 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import urllib -import yaml import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership -from synapse.appservice import ApplicationService, AppServiceTransaction -from synapse.config._base import ConfigError +from synapse.appservice import AppServiceTransaction +from synapse.config.appservice import load_appservices from synapse.storage.roommember import RoomsForUser -from synapse.types import UserID from ._base import SQLBaseStore @@ -34,7 +31,7 @@ class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) self.hostname = hs.hostname - self.services_cache = ApplicationServiceStore.load_appservices( + self.services_cache = load_appservices( hs.hostname, hs.config.app_service_config_files ) @@ -144,102 +141,6 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - @classmethod - def _load_appservice(cls, hostname, as_info, config_filename): - required_string_fields = [ - "id", "url", "as_token", "hs_token", "sender_localpart" - ] - for field in required_string_fields: - if not isinstance(as_info.get(field), basestring): - raise KeyError("Required string field: '%s' (%s)" % ( - field, config_filename, - )) - - localpart = as_info["sender_localpart"] - if urllib.quote(localpart) != localpart: - raise ValueError( - "sender_localpart needs characters which are not URL encoded." - ) - user = UserID(localpart, hostname) - user_id = user.to_string() - - # namespace checks - if not isinstance(as_info.get("namespaces"), dict): - raise KeyError("Requires 'namespaces' object.") - for ns in ApplicationService.NS_LIST: - # specific namespaces are optional - if ns in as_info["namespaces"]: - # expect a list of dicts with exclusive and regex keys - for regex_obj in as_info["namespaces"][ns]: - if not isinstance(regex_obj, dict): - raise ValueError( - "Expected namespace entry in %s to be an object," - " but got %s", ns, regex_obj - ) - if not isinstance(regex_obj.get("regex"), basestring): - raise ValueError( - "Missing/bad type 'regex' key in %s", regex_obj - ) - if not isinstance(regex_obj.get("exclusive"), bool): - raise ValueError( - "Missing/bad type 'exclusive' key in %s", regex_obj - ) - return ApplicationService( - token=as_info["as_token"], - url=as_info["url"], - namespaces=as_info["namespaces"], - hs_token=as_info["hs_token"], - sender=user_id, - id=as_info["id"], - ) - - @classmethod - def load_appservices(cls, hostname, config_files): - """Returns a list of Application Services from the config files.""" - if not isinstance(config_files, list): - logger.warning( - "Expected %s to be a list of AS config files.", config_files - ) - return [] - - # Dicts of value -> filename - seen_as_tokens = {} - seen_ids = {} - - appservices = [] - - for config_file in config_files: - try: - with open(config_file, 'r') as f: - appservice = ApplicationServiceStore._load_appservice( - hostname, yaml.load(f), config_file - ) - if appservice.id in seen_ids: - raise ConfigError( - "Cannot reuse ID across application services: " - "%s (files: %s, %s)" % ( - appservice.id, config_file, seen_ids[appservice.id], - ) - ) - seen_ids[appservice.id] = config_file - if appservice.token in seen_as_tokens: - raise ConfigError( - "Cannot reuse as_token across application services: " - "%s (files: %s, %s)" % ( - appservice.token, - config_file, - seen_as_tokens[appservice.token], - ) - ) - seen_as_tokens[appservice.token] = config_file - logger.info("Loaded application service: %s", appservice) - appservices.append(appservice) - except Exception as e: - logger.error("Failed to load appservice from '%s'", config_file) - logger.exception(e) - raise - return appservices - class ApplicationServiceTransactionStore(SQLBaseStore): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..940e11d7a2 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -115,19 +119,23 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None): + max_stream_ordering=None, + limit=20): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " + "FROM (" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl " - "WHERE" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" " ep.room_id = rl.room_id" " AND (" " ep.topological_ordering > rl.topological_ordering" @@ -144,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore): if max_stream_ordering is not None: sql += " AND ep.stream_ordering <= ?" args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering ASC" + sql += " ORDER BY ep.stream_ordering ASC LIMIT ?" + args.append(limit) txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( @@ -153,11 +162,13 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep " - "WHERE ep.room_id not in (" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ? " + " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" ") AND ep.user_id = ? AND ep.stream_ordering > ?" ) @@ -175,12 +186,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.stream_ordering > ?" + " ORDER BY ep.stream_ordering ASC" + " LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") @@ -201,6 +230,93 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + """ + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id, ) + ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. + txn.execute( + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < ?", + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + ) + + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago + ) + + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 21487724ed..2b3f79577b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + import logging import math @@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, per_item_callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.setdefault(room_id, deque()) + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) - """ + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, @@ -203,9 +342,6 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) @@ -427,6 +563,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], @@ -1143,6 +1280,12 @@ class EventsStore(SQLBaseStore): current_backfill_id, current_forward_id, limit): """Get all the new events that have arrived at the server either as new events or as backfilled events""" + have_backfill_events = last_backfill_id != current_backfill_id + have_forward_events = last_forward_id != current_forward_id + + if not have_backfill_events and not have_forward_events: + return defer.succeed(AllNewEventsResult([], [], [], [], [])) + def get_all_new_events_txn(txn): sql = ( "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group" @@ -1155,7 +1298,7 @@ class EventsStore(SQLBaseStore): " ORDER BY e.stream_ordering ASC" " LIMIT ?" ) - if last_forward_id != current_forward_id: + if have_forward_events: txn.execute(sql, (last_forward_id, current_forward_id, limit)) new_forward_events = txn.fetchall() @@ -1199,7 +1342,7 @@ class EventsStore(SQLBaseStore): " ORDER BY e.stream_ordering DESC" " LIMIT ?" ) - if last_backfill_id != current_backfill_id: + if have_backfill_events: txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) new_backfill_events = txn.fetchall() diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644
index 0000000000..5dabb607bd --- /dev/null +++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 57f14fd12b..c8487c8838 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 31 +SCHEMA_VERSION = 32 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 07f5fae8dd..3fab57a7e8 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py
@@ -149,6 +149,7 @@ class PresenceStore(SQLBaseStore): "status_msg", "currently_active", ), + desc="get_presence_for_users", ) for row in rows: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d2bf7f2aec..786d6f6d67 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py
@@ -14,7 +14,8 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.push.baserules import list_with_base_rules from twisted.internet import defer import logging @@ -23,8 +24,31 @@ import simplejson as json logger = logging.getLogger(__name__) +def _load_rules(rawrules, enabled_map): + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + rules = list(list_with_base_rules(ruleslist)) + + for i, rule in enumerate(rules): + rule_id = rule['rule_id'] + if rule_id in enabled_map: + if rule.get('enabled', True) != bool(enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(enabled_map[rule_id]) + rules[i] = rule + + return rules + + class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks() + @cachedInlineCallbacks(lru=True) def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -42,9 +66,13 @@ class PushRuleStore(SQLBaseStore): key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) ) - defer.returnValue(rows) + enabled_map = yield self.get_push_rules_enabled_for_user(user_id) - @cachedInlineCallbacks() + rules = _load_rules(rows, enabled_map) + + defer.returnValue(rules) + + @cachedInlineCallbacks(lru=True) def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", @@ -60,12 +88,16 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: [] + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules", @@ -75,18 +107,32 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules", ) - rows.sort(key=lambda e: (-e["priority_class"], -e["priority"])) + rows.sort( + key=lambda row: (-int(row["priority_class"]), -int(row["priority"])) + ) for row in rows: results.setdefault(row['user_name'], []).append(row) + + enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) + + for user_id, rules in results.items(): + results[user_id] = _load_rules( + rules, enabled_map_by_user.get(user_id, {}) + ) + defer.returnValue(results) - @defer.inlineCallbacks + @cachedList(cached_method_name="get_push_rules_enabled_for_user", + list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids): if not user_ids: defer.returnValue({}) - results = {} + results = { + user_id: {} + for user_id in user_ids + } rows = yield self._simple_select_many_batch( table="push_rules_enable", @@ -96,7 +142,8 @@ class PushRuleStore(SQLBaseStore): desc="bulk_get_push_rules_enabled", ) for row in rows: - results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + enabled = bool(row['enabled']) + results.setdefault(row['user_name'], {})[row['rule_id']] = enabled defer.returnValue(results) @defer.inlineCallbacks diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index e5755c0aea..a7d7c54d7e 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py
@@ -18,7 +18,7 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList import logging import simplejson as json @@ -106,6 +106,9 @@ class PusherStore(SQLBaseStore): return self._pushers_id_gen.get_current_token() def get_all_updated_pushers(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed(([], [])) + def get_all_updated_pushers_txn(txn): sql = ( "SELECT id, user_name, access_token, profile_tag, kind," @@ -132,19 +135,35 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) - @cachedInlineCallbacks(num_args=1) - def get_users_with_pushers_in_room(self, room_id): - users = yield self.get_users_in_room(room_id) - + @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000) + def get_if_user_has_pusher(self, user_id): result = yield self._simple_select_many_batch( table='pushers', + keyvalues={ + 'user_name': 'user_id', + }, + retcol='user_name', + desc='get_if_user_has_pusher', + allow_none=True, + ) + + defer.returnValue(bool(result)) + + @cachedList(cached_method_name="get_if_user_has_pusher", + list_name="user_ids", num_args=1, inlineCallbacks=True) + def get_if_users_have_pushers(self, user_ids): + rows = yield self._simple_select_many_batch( + table='pushers', column='user_name', - iterable=users, + iterable=user_ids, retcols=['user_name'], - desc='get_users_with_pushers_in_room' + desc='get_if_users_have_pushers' ) - defer.returnValue([r['user_name'] for r in result]) + result = {user_id: False for user_id in user_ids} + result.update({r['user_name']: True for r in rows}) + + defer.returnValue(result) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -153,8 +172,7 @@ class PusherStore(SQLBaseStore): profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: def f(txn): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) - return self._simple_upsert_txn( + newly_inserted = self._simple_upsert_txn( txn, "pushers", { @@ -175,11 +193,18 @@ class PusherStore(SQLBaseStore): "id": stream_id, }, ) - defer.returnValue((yield self.runInteraction("add_pusher", f))) + if newly_inserted: + # get_if_user_has_pusher only cares if the user has + # at least *one* pusher. + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + + yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + self._simple_delete_one_txn( txn, "pushers", @@ -191,6 +216,7 @@ class PusherStore(SQLBaseStore): {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, {"stream_id": stream_id}, ) + with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( "delete_pusher", delete_pusher_txn, stream_id @@ -230,3 +256,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 3b8805593e..8c26f39fbb 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore): "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() ) + @cachedInlineCallbacks() + def get_users_with_read_receipts_in_room(self, room_id): + receipts = yield self.get_receipts_for_room(room_id, "m.read") + defer.returnValue(set(r['user_id'] for r in receipts)) + + def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type, + user_id): + if receipt_type != "m.read": + return + + # Returns an ObservableDeferred + res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) + + if res and res.called and user_id in res.result: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return + + self.get_users_with_read_receipts_in_room.invalidate((room_id,)) + @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): return self._simple_select_list( @@ -100,7 +120,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000) + @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -229,10 +249,14 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -244,6 +268,17 @@ class ReceiptsStore(SQLBaseStore): (user_id, room_id, receipt_type) ) + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + topological_ordering = int(res["topological_ordering"]) if res else None + stream_ordering = int(res["stream_ordering"]) if res else None + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -255,16 +290,7 @@ class ReceiptsStore(SQLBaseStore): txn.execute(sql, (room_id, receipt_type, user_id)) results = txn.fetchall() - if results: - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - ) - topological_ordering = int(res["topological_ordering"]) - stream_ordering = int(res["stream_ordering"]) - + if results and topological_ordering: for to, so, _ in results: if int(to) > topological_ordering: return False @@ -294,6 +320,14 @@ class ReceiptsStore(SQLBaseStore): } ) + if receipt_type == "m.read" and topological_ordering: + self._remove_old_push_actions_before_txn( + txn, + room_id=room_id, + user_id=user_id, + topological_ordering=topological_ordering, + ) + return True @defer.inlineCallbacks @@ -364,10 +398,14 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) + txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, @@ -391,6 +429,9 @@ class ReceiptsStore(SQLBaseStore): ) def get_all_updated_receipts(self, last_id, current_id, limit=None): + if last_id == current_id: + return defer.succeed([]) + def get_all_updated_receipts_txn(txn): sql = ( "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 70aa64fb31..26933e593a 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py
@@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging +import ujson as json logger = logging.getLogger(__name__) @@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore): aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) + + def add_event_report(self, room_id, event_id, user_id, reason, content, + received_ts): + next_id = self._event_reports_id_gen.get_next() + return self._simple_insert( + table="event_reports", + values={ + "id": next_id, + "received_ts": received_ts, + "room_id": room_id, + "event_id": event_id, + "user_id": user_id, + "reason": reason, + "content": json.dumps(content), + }, + desc="add_event_report" + ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..64b4bd371b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import Membership -from synapse.types import UserID +from synapse.types import get_domain_from_id import logging @@ -59,9 +59,6 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) - txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering ) @@ -137,24 +134,6 @@ class RoomMemberStore(SQLBaseStore): return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) - def get_room_members(self, room_id, membership=None): - """Retrieve the current room member list for a room. - - Args: - room_id (str): The room to get the list of members. - membership (synapse.api.constants.Membership): The filter to apply - to this list, or None to return all members with some state - associated with this room. - Returns: - list of namedtuples representing the members in this room. - """ - return self.runInteraction( - "get_room_members", - self._get_members_events_txn, - room_id, - membership=membership, - ).addCallback(self._get_events) - @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to @@ -259,26 +238,10 @@ class RoomMemberStore(SQLBaseStore): return results - @cached(max_entries=5000) + @cachedInlineCallbacks(max_entries=5000) def get_joined_hosts_for_room(self, room_id): - return self.runInteraction( - "get_joined_hosts_for_room", - self._get_joined_hosts_for_room_txn, - room_id, - ) - - def _get_joined_hosts_for_room_txn(self, txn, room_id): - rows = self._get_members_rows_txn( - txn, - room_id, membership=Membership.JOIN - ) - - joined_domains = set( - UserID.from_string(r["user_id"]).domain - for r in rows - ) - - return joined_domains + user_ids = yield self.get_users_in_room(room_id) + defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids)) def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): rows = self._get_members_rows_txn( diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.storage.appservice import ApplicationServiceStore +from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = ApplicationServiceStore.load_appservices( + appservices = load_appservices( config.server_name, config_files ) diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql new file mode 100644
index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644
index 0000000000..36f37b11c8 --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql new file mode 100644
index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql new file mode 100644
index 0000000000..f859be46a6 --- /dev/null +++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- The following indices are redundant, other indices are equivalent or +-- supersets +DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream +DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT + +DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT + +-- The following indices were unused +DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id; +DROP INDEX IF EXISTS evauth_edges_auth_id; +DROP INDEX IF EXISTS presence_stream_state; diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql new file mode 100644
index 0000000000..d13609776f --- /dev/null +++ b/synapse/storage/schema/delta/32/reports.sql
@@ -0,0 +1,25 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE event_reports( + id BIGINT NOT NULL PRIMARY KEY, + received_ts BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + reason TEXT, + content TEXT +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash +from synapse.util.caches.descriptors import cached, cachedList class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" + @cached(lru=True) + def get_event_reference_hash(self, event_id): + return self._get_event_reference_hashes_txn(event_id) + + @cachedList(cached_method_name="get_event_reference_hash", + list_name="event_ids", num_args=1) def get_event_reference_hashes(self, event_ids): def f(txn): - return [ - self._get_event_reference_hashes_txn(txn, ev) - for ev in event_ids - ] + return { + event_id: self._get_event_reference_hashes_txn(txn, event_id) + for event_id in event_ids + } return self.runInteraction( "get_event_reference_hashes", @@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore): hashes = yield self.get_event_reference_hashes( event_ids ) - hashes = [ - { + hashes = { + e_id: { k: encode_base64(v) for k, v in h.items() if k == "sha256" } - for h in hashes - ] + for e_id, h in hashes.items() + } - defer.returnValue(zip(event_ids, hashes)) + defer.returnValue(hashes.items()) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + self.last_transaction = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + self.last_transaction[destination] = transaction_id + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!")