diff options
Diffstat (limited to 'synapse/storage')
95 files changed, 1225 insertions, 686 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c46b653f11..5a9e7720d9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore +from .event_push_actions import EventPushActionsStore from .state import StateStore from .signatures import SignatureStore @@ -44,6 +45,10 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from util.id_generators import IdGenerator, StreamIdGenerator + +from synapse.util.caches.stream_change_cache import StreamChangeCache + import logging @@ -54,7 +59,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120*1000 +LAST_SEEN_GRANULARITY = 120 * 1000 class DataStore(RoomMemberStore, RoomStore, @@ -75,20 +80,100 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, + EventPushActionsStore ): - def __init__(self, hs): - super(DataStore, self).__init__(hs) + def __init__(self, db_conn, hs): self.hs = hs + self.database_engine = hs.database_engine - self.min_token_deferred = self._get_min_token() - self.min_token = None + cur = db_conn.cursor() + try: + cur.execute("SELECT MIN(stream_ordering) FROM events",) + rows = cur.fetchall() + self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1 + self.min_stream_token = min(self.min_stream_token, -1) + finally: + cur.close() self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, ) + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering" + ) + self._receipts_id_gen = StreamIdGenerator( + db_conn, "receipts_linearized", "stream_id" + ) + self._account_data_id_gen = StreamIdGenerator( + db_conn, "account_data_max_stream_id", "stream_id" + ) + + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self) + self._pushers_id_gen = IdGenerator("pushers", "id", self) + self._push_rule_id_gen = IdGenerator("push_rules", "id", self) + self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + + events_max = self._stream_id_gen.get_max_token(None) + event_cache_prefill, min_event_val = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", min_event_val, + prefilled_cache=event_cache_prefill, + ) + + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", events_max, + ) + + account_max = self._account_data_id_gen.get_max_token(None) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", account_max, + ) + + super(DataStore, self).__init__(hs) + + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): + # Fetch a mapping of room_id -> max stream position for "recent" rooms. + # It doesn't really matter how many we get, the StreamChangeCache will + # do the right thing to ensure it respects the max size of cache. + sql = ( + "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" + " WHERE %(stream)s > ? - 100000" + " GROUP BY %(entity)s" + ) % { + "table": table, + "entity": entity_column, + "stream": stream_column, + } + + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, (int(max_value),)) + rows = txn.fetchall() + + cache = { + row[0]: int(row[1]) + for row in rows + } + + if cache: + min_val = min(cache.values()) + else: + min_val = max_value + + return cache, min_val + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 17a14e001c..2e97ac84a8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,13 +15,11 @@ import logging from synapse.api.errors import StoreError -from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_context_over_fn, LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache import synapse.metrics -from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer @@ -175,16 +173,6 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine - self._stream_id_gen = StreamIdGenerator("events", "stream_ordering") - self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) - self._state_groups_id_gen = IdGenerator("state_groups", "id", self) - self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) - self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self) - self._pushers_id_gen = IdGenerator("pushers", "id", self) - self._push_rule_id_gen = IdGenerator("push_rules", "id", self) - self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) - self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id") - def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -197,7 +185,7 @@ class SQLBaseStore(object): time_then = self._previous_loop_ts self._previous_loop_ts = time_now - ratio = (curr - prev)/(time_now - time_then) + ratio = (curr - prev) / (time_now - time_then) top_three_counters = self._txn_perf_counters.interval( time_now - time_then, limit=3 @@ -310,10 +298,10 @@ class SQLBaseStore(object): func, *args, **kwargs ) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) for after_callback, after_args in after_callbacks: after_callback(*after_args) @@ -338,14 +326,15 @@ class SQLBaseStore(object): return func(conn, *args, **kwargs) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) defer.returnValue(result) - def cursor_to_dict(self, cursor): + @staticmethod + def cursor_to_dict(cursor): """Converts a SQL cursor into an list of dicts. Args: @@ -402,8 +391,8 @@ class SQLBaseStore(object): if not or_ignore: raise - @log_function - def _simple_insert_txn(self, txn, table, values): + @staticmethod + def _simple_insert_txn(txn, table, values): keys, vals = zip(*values.items()) sql = "INSERT INTO %s (%s) VALUES(%s)" % ( @@ -414,7 +403,8 @@ class SQLBaseStore(object): txn.execute(sql, vals) - def _simple_insert_many_txn(self, txn, table, values): + @staticmethod + def _simple_insert_many_txn(txn, table, values): if not values: return @@ -537,9 +527,10 @@ class SQLBaseStore(object): table, keyvalues, retcol, allow_none=allow_none, ) - def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol, + @classmethod + def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol, allow_none=False): - ret = self._simple_select_onecol_txn( + ret = cls._simple_select_onecol_txn( txn, table=table, keyvalues=keyvalues, @@ -554,7 +545,8 @@ class SQLBaseStore(object): else: raise StoreError(404, "No row found") - def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): + @staticmethod + def _simple_select_onecol_txn(txn, table, keyvalues, retcol): sql = ( "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { @@ -603,7 +595,8 @@ class SQLBaseStore(object): table, keyvalues, retcols ) - def _simple_select_list_txn(self, txn, table, keyvalues, retcols): + @classmethod + def _simple_select_list_txn(cls, txn, table, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -627,7 +620,83 @@ class SQLBaseStore(object): ) txn.execute(sql) - return self.cursor_to_dict(txn) + return cls.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _simple_select_many_batch(self, table, column, iterable, retcols, + keyvalues={}, desc="_simple_select_many_batch", + batch_size=100): + """Executes a SELECT query on the named table, which may return zero or + more rows, returning the result as a list of dicts. + + Filters rows by if value of `column` is in `iterable`. + + Args: + table : string giving the table name + column : column name to test for inclusion against `iterable` + iterable : list + keyvalues : dict of column names and values to select the rows with + retcols : list of strings giving the names of the columns to return + """ + results = [] + + if not iterable: + defer.returnValue(results) + + chunks = [ + iterable[i:i + batch_size] + for i in xrange(0, len(iterable), batch_size) + ] + for chunk in chunks: + rows = yield self.runInteraction( + desc, + self._simple_select_many_txn, + table, column, chunk, keyvalues, retcols + ) + + results.extend(rows) + + defer.returnValue(results) + + @classmethod + def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols): + """Executes a SELECT query on the named table, which may return zero or + more rows, returning the result as a list of dicts. + + Filters rows by if value of `column` is in `iterable`. + + Args: + txn : Transaction object + table : string giving the table name + column : column name to test for inclusion against `iterable` + iterable : list + keyvalues : dict of column names and values to select the rows with + retcols : list of strings giving the names of the columns to return + """ + if not iterable: + return [] + + sql = "SELECT %s FROM %s" % (", ".join(retcols), table) + + clauses = [] + values = [] + clauses.append( + "%s IN (%s)" % (column, ",".join("?" for _ in iterable)) + ) + values.extend(iterable) + + for key, value in keyvalues.items(): + clauses.append("%s = ?" % (key,)) + values.append(value) + + if clauses: + sql = "%s WHERE %s" % ( + sql, + " AND ".join(clauses), + ) + + txn.execute(sql, values) + return cls.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, desc="_simple_update_one"): @@ -654,7 +723,8 @@ class SQLBaseStore(object): table, keyvalues, updatevalues, ) - def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + @staticmethod + def _simple_update_one_txn(txn, table, keyvalues, updatevalues): update_sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), @@ -671,7 +741,8 @@ class SQLBaseStore(object): if txn.rowcount > 1: raise StoreError(500, "More than one row matched") - def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + @staticmethod + def _simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False): select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), @@ -712,7 +783,8 @@ class SQLBaseStore(object): raise StoreError(500, "more than one row matched") return self.runInteraction(desc, func) - def _simple_delete_txn(self, txn, table, keyvalues): + @staticmethod + def _simple_delete_txn(txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index d1829f84e8..b8387fc500 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -83,7 +83,7 @@ class AccountDataStore(SQLBaseStore): "get_account_data_for_room", get_account_data_for_room_txn ) - def get_updated_account_data_for_user(self, user_id, stream_id): + def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None): """Get all the client account_data for a that's changed. Args: @@ -120,6 +120,12 @@ class AccountDataStore(SQLBaseStore): return (global_account_data, account_data_by_room) + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + if not changed: + return ({}, {}) + return self.runInteraction( "get_updated_account_data_for_user", get_updated_account_data_for_user_txn ) @@ -151,6 +157,10 @@ class AccountDataStore(SQLBaseStore): "content": content_json, } ) + txn.call_after( + self._account_data_stream_cache.entity_has_changed, + user_id, next_id, + ) self._update_max_stream_id(txn, next_id) with (yield self._account_data_id_gen.get_next(self)) as next_id: @@ -186,6 +196,10 @@ class AccountDataStore(SQLBaseStore): "content": content_json, } ) + txn.call_after( + self._account_data_stream_cache.entity_has_changed, + user_id, next_id, + ) self._update_max_stream_id(txn, next_id) with (yield self._account_data_id_gen.get_next(self)) as next_id: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 39b7881c40..1100c67714 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,12 +15,12 @@ import logging import urllib import yaml -from simplejson import JSONDecodeError import simplejson as json from twisted.internet import defer from synapse.api.constants import Membership from synapse.appservice import ApplicationService, AppServiceTransaction +from synapse.config._base import ConfigError from synapse.storage.roommember import RoomsForUser from synapse.types import UserID from ._base import SQLBaseStore @@ -144,66 +144,9 @@ class ApplicationServiceStore(SQLBaseStore): return rooms_for_user_matching_user_id - def _parse_services_dict(self, results): - # SQL results in the form: - # [ - # { - # 'regex': "something", - # 'url': "something", - # 'namespace': enum, - # 'as_id': 0, - # 'token': "something", - # 'hs_token': "otherthing", - # 'id': 0 - # } - # ] - services = {} - for res in results: - as_token = res["token"] - if as_token is None: - continue - if as_token not in services: - # add the service - services[as_token] = { - "id": res["id"], - "url": res["url"], - "token": as_token, - "hs_token": res["hs_token"], - "sender": res["sender"], - "namespaces": { - ApplicationService.NS_USERS: [], - ApplicationService.NS_ALIASES: [], - ApplicationService.NS_ROOMS: [] - } - } - # add the namespace regex if one exists - ns_int = res["namespace"] - if ns_int is None: - continue - try: - services[as_token]["namespaces"][ - ApplicationService.NS_LIST[ns_int]].append( - json.loads(res["regex"]) - ) - except IndexError: - logger.error("Bad namespace enum '%s'. %s", ns_int, res) - except JSONDecodeError: - logger.error("Bad regex object '%s'", res["regex"]) - - service_list = [] - for service in services.values(): - service_list.append(ApplicationService( - token=service["token"], - url=service["url"], - namespaces=service["namespaces"], - hs_token=service["hs_token"], - sender=service["sender"], - id=service["id"] - )) - return service_list - def _load_appservice(self, as_info): required_string_fields = [ + # TODO: Add id here when it's stable to release "url", "as_token", "hs_token", "sender_localpart" ] for field in required_string_fields: @@ -245,7 +188,7 @@ class ApplicationServiceStore(SQLBaseStore): namespaces=as_info["namespaces"], hs_token=as_info["hs_token"], sender=user_id, - id=as_info["as_token"] # the token is the only unique thing here + id=as_info["id"] if "id" in as_info else as_info["as_token"], ) def _populate_appservice_cache(self, config_files): @@ -256,15 +199,38 @@ class ApplicationServiceStore(SQLBaseStore): ) return + # Dicts of value -> filename + seen_as_tokens = {} + seen_ids = {} + for config_file in config_files: try: with open(config_file, 'r') as f: appservice = self._load_appservice(yaml.load(f)) + if appservice.id in seen_ids: + raise ConfigError( + "Cannot reuse ID across application services: " + "%s (files: %s, %s)" % ( + appservice.id, config_file, seen_ids[appservice.id], + ) + ) + seen_ids[appservice.id] = config_file + if appservice.token in seen_as_tokens: + raise ConfigError( + "Cannot reuse as_token across application services: " + "%s (files: %s, %s)" % ( + appservice.token, + config_file, + seen_as_tokens[appservice.token], + ) + ) + seen_as_tokens[appservice.token] = config_file logger.info("Loaded application service: %s", appservice) self.services_cache.append(appservice) except Exception as e: logger.error("Failed to load appservice from '%s'", config_file) logger.exception(e) + raise class ApplicationServiceTransactionStore(SQLBaseStore): @@ -310,7 +276,8 @@ class ApplicationServiceTransactionStore(SQLBaseStore): "application_services_state", dict(as_id=service.id), ["state"], - allow_none=True + allow_none=True, + desc="get_appservice_state", ) if result: defer.returnValue(result.get("state")) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 45fccc2e5e..49904046cf 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index d92028ea43..1556619d5e 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 325740d7d0..5dd32b1413 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index bd3c8f9452..4290aea83a 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b549d314b..ec5a4d198b 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 98d66e0a86..17b7a9c077 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index a5a54ec011..91fac33b8b 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ class Sqlite3Engine(object): def _parse_match_info(buf): bufsize = len(buf) - return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)] + return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)] def _rank(raw_match_info): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 6d4421dd8f..ce2c794025 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore): new_front = set() front_list = list(front) chunks = [ - front_list[x:x+100] + front_list[x:x + 100] for x in xrange(0, len(front), 100) ] for chunk in chunks: diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py new file mode 100644 index 0000000000..d77a817682 --- /dev/null +++ b/synapse/storage/event_push_actions.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer +from synapse.util.caches.descriptors import cachedInlineCallbacks + +import logging +import ujson as json + +logger = logging.getLogger(__name__) + + +class EventPushActionsStore(SQLBaseStore): + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): + """ + :param event: the event set actions for + :param tuples: list of tuples of (user_id, profile_tag, actions) + """ + values = [] + for uid, profile_tag, actions in tuples: + values.append({ + 'room_id': event.room_id, + 'event_id': event.event_id, + 'user_id': uid, + 'profile_tag': profile_tag, + 'actions': json.dumps(actions), + 'stream_ordering': event.internal_metadata.stream_ordering, + 'topological_ordering': event.depth, + 'notif': 1, + 'highlight': 1 if _action_has_highlight(actions) else 0, + }) + + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) + + @cachedInlineCallbacks(num_args=3, lru=True, tree=True) + def get_unread_event_push_actions_by_room_for_user( + self, room_id, user_id, last_read_event_id + ): + def _get_unread_event_push_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return {"notify_count": 0, "highlight_count": 0} + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT sum(notif), sum(highlight)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" + " AND (" + " topological_ordering > ?" + " OR (topological_ordering = ? AND stream_ordering > ?)" + ")" + ) + txn.execute(sql, ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + )) + row = txn.fetchone() + if row: + return { + "notify_count": row[0] or 0, + "highlight_count": row[1] or 0, + } + else: + return {"notify_count": 0, "highlight_count": 0} + + ret = yield self.runInteraction( + "get_unread_event_push_actions_by_room", + _get_unread_event_push_actions_by_room + ) + defer.returnValue(ret) + + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) + ) + + +def _action_has_highlight(actions): + for action in actions: + try: + if action.get("set_tweak", None) == "highlight": + return action.get("value", True) + except AttributeError: + pass + + return False diff --git a/synapse/storage/events.py b/synapse/storage/events.py index fc5725097c..3a5c6ee4b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.logcontext import preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -66,11 +66,9 @@ class EventsStore(SQLBaseStore): return if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - start = self.min_token - 1 - self.min_token -= len(events_and_contexts) + 1 - stream_orderings = range(start, self.min_token, -1) + start = self.min_stream_token - 1 + self.min_stream_token -= len(events_and_contexts) + 1 + stream_orderings = range(start, self.min_stream_token, -1) @contextmanager def stream_ordering_manager(): @@ -86,7 +84,7 @@ class EventsStore(SQLBaseStore): event.internal_metadata.stream_ordering = stream chunks = [ - events_and_contexts[x:x+100] + events_and_contexts[x:x + 100] for x in xrange(0, len(events_and_contexts), 100) ] @@ -107,10 +105,8 @@ class EventsStore(SQLBaseStore): is_new_state=True, current_state=None): stream_ordering = None if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token + self.min_stream_token -= 1 + stream_ordering = self.min_stream_token if stream_ordering is None: stream_ordering_manager = yield self._stream_id_gen.get_next(self) @@ -209,17 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) + if not backfilled: + txn.call_after( + self._events_stream_cache.entity_has_changed, + event.room_id, event.internal_metadata.stream_ordering, + ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items(): @@ -662,14 +670,16 @@ class EventsStore(SQLBaseStore): for ids, d in lst: if not d.called: try: - d.callback([ - res[i] - for i in ids - if i in res - ]) + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list, row_dict) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -677,10 +687,12 @@ class EventsStore(SQLBaseStore): def fire(evs): for _, d in evs: if not d.called: - d.errback(e) + with PreserveLoggingContext(): + d.errback(e) if event_list: - reactor.callFromThread(fire, event_list) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -707,18 +719,20 @@ class EventsStore(SQLBaseStore): should_start = False if should_start: - self.runWithConnection( - self._do_fetch - ) + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch + ) - rows = yield preserve_context_over_deferred(events_d) + with PreserveLoggingContext(): + rows = yield events_d if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] res = yield defer.gatherResults( [ - self._get_event_from_row( + preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -738,7 +752,7 @@ class EventsStore(SQLBaseStore): rows = [] N = 200 for i in range(1 + len(events) / N): - evs = events[i*N:(i + 1)*N] + evs = events[i * N:(i + 1) * N] if not evs: break @@ -753,7 +767,7 @@ class EventsStore(SQLBaseStore): " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(evs)),) + ) % (",".join(["?"] * len(evs)),) txn.execute(sql, evs) rows.extend(self.cursor_to_dict(txn)) @@ -936,6 +950,7 @@ class EventsStore(SQLBaseStore): ) now_reporting = self.cursor_to_dict(txn) if not now_reporting: + logger.info("Calculating daily messages skipped; no now_reporting") return None now_reporting = now_reporting[0]["stream_ordering"] @@ -948,11 +963,18 @@ class EventsStore(SQLBaseStore): ) if not last_reported: + logger.info("Calculating daily messages skipped; no last_reported") return None # Close enough to correct for our purposes. yesterday = (now - 24 * 60 * 60) - if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60: + since_yesterday_seconds = yesterday - last_reported[0]["reported_time"] + any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60 + if any_since_yesterday: + logger.info( + "Calculating daily messages skipped; since_yesterday_seconds: %d" % + (since_yesterday_seconds,) + ) return None txn.execute( @@ -968,6 +990,7 @@ class EventsStore(SQLBaseStore): ) rows = self.cursor_to_dict(txn) if not rows: + logger.info("Calculating daily messages skipped; messages count missing") return None return rows[0]["messages"] diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index fcd43c7fdd..5248736816 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,12 +16,13 @@ from twisted.internet import defer from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks import simplejson as json class FilteringStore(SQLBaseStore): - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=2) def get_user_filter(self, user_localpart, filter_id): def_json = yield self._simple_select_one_onecol( table="user_filters", diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 344cacdc75..fd05bfe54e 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ class KeyStore(SQLBaseStore): table="server_tls_certificates", keyvalues={"server_name": server_name}, retcols=("tls_certificate",), + desc="get_server_certificate", ) tls_certificate = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_ASN1, tls_certificate_bytes, diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 7bf57234f6..0894384780 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 16eff62544..850736c85e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014 - 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 27 +SCHEMA_VERSION = 29 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -211,7 +211,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, logger.debug("applied_delta_files: %s", applied_delta_files) for v in range(start_ver, SCHEMA_VERSION + 1): - logger.debug("Upgrading schema to v%d", v) + logger.info("Upgrading schema to v%d", v) delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 34ca3b9a54..ef525f34c5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -48,27 +48,29 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) - @cachedList(get_presence_state.cache, list_name="user_localparts") + @cachedList(get_presence_state.cache, list_name="user_localparts", + inlineCallbacks=True) def get_presence_states(self, user_localparts): - def f(txn): - results = {} - for user_localpart in user_localparts: - res = self._simple_select_one_txn( - txn, - table="presence", - keyvalues={"user_id": user_localpart}, - retcols=["state", "status_msg", "mtime"], - allow_none=True, - ) - if res: - results[user_localpart] = res - - return results - - return self.runInteraction("get_presence_states", f) + rows = yield self._simple_select_many_batch( + table="presence", + column="user_id", + iterable=user_localparts, + retcols=("user_id", "state", "status_msg", "mtime",), + desc="get_presence_states", + ) + + defer.returnValue({ + row["user_id"]: { + "state": row["state"], + "status_msg": row["status_msg"], + "mtime": row["mtime"], + } + for row in rows + }) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - res = self._simple_update_one( + res = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -78,7 +80,7 @@ class PresenceStore(SQLBaseStore): ) self.get_presence_state.invalidate((user_localpart,)) - return res + defer.returnValue(res) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index a6e52cb248..26a40905ae 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 5305b7e122..f9a48171ba 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,13 +25,16 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): @cachedInlineCallbacks() - def get_push_rules_for_user(self, user_name): + def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( - table=PushRuleTable.table_name, + table="push_rules", keyvalues={ - "user_name": user_name, + "user_name": user_id, }, - retcols=PushRuleTable.fields, + retcols=( + "user_name", "rule_id", "priority_class", "priority", + "conditions", "actions", + ), desc="get_push_rules_enabled_for_user", ) @@ -42,13 +45,15 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) @cachedInlineCallbacks() - def get_push_rules_enabled_for_user(self, user_name): + def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( - table=PushRuleEnableTable.table_name, + table="push_rules_enable", keyvalues={ - 'user_name': user_name + 'user_name': user_id }, - retcols=PushRuleEnableTable.fields, + retcols=( + "user_name", "rule_id", "enabled", + ), desc="get_push_rules_enabled_for_user", ) defer.returnValue({ @@ -56,6 +61,45 @@ class PushRuleStore(SQLBaseStore): }) @defer.inlineCallbacks + def bulk_get_push_rules(self, user_ids): + if not user_ids: + defer.returnValue({}) + + results = {} + + rows = yield self._simple_select_many_batch( + table="push_rules", + column="user_name", + iterable=user_ids, + retcols=("*",), + desc="bulk_get_push_rules", + ) + + rows.sort(key=lambda e: (-e["priority_class"], -e["priority"])) + + for row in rows: + results.setdefault(row['user_name'], []).append(row) + defer.returnValue(results) + + @defer.inlineCallbacks + def bulk_get_push_rules_enabled(self, user_ids): + if not user_ids: + defer.returnValue({}) + + results = {} + + rows = yield self._simple_select_many_batch( + table="push_rules_enable", + column="user_name", + iterable=user_ids, + retcols=("user_name", "rule_id", "enabled",), + desc="bulk_get_push_rules_enabled", + ) + for row in rows: + results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + defer.returnValue(results) + + @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = kwargs if 'conditions' in vals: @@ -84,15 +128,16 @@ class PushRuleStore(SQLBaseStore): ) defer.returnValue(ret) - def _add_push_rule_relative_txn(self, txn, user_name, **kwargs): + def _add_push_rule_relative_txn(self, txn, user_id, **kwargs): after = kwargs.pop("after", None) - relative_to_rule = kwargs.pop("before", after) + before = kwargs.pop("before", None) + relative_to_rule = before or after res = self._simple_select_one_txn( txn, - table=PushRuleTable.table_name, + table="push_rules", keyvalues={ - "user_name": user_name, + "user_name": user_id, "rule_id": relative_to_rule, }, retcols=["priority_class", "priority"], @@ -116,7 +161,7 @@ class PushRuleStore(SQLBaseStore): new_rule.pop("before", None) new_rule.pop("after", None) new_rule['priority_class'] = priority_class - new_rule['user_name'] = user_name + new_rule['user_name'] = user_id new_rule['id'] = self._push_rule_id_gen.get_next_txn(txn) # check if the priority before/after is free @@ -129,16 +174,16 @@ class PushRuleStore(SQLBaseStore): new_rule['priority'] = new_rule_priority sql = ( - "SELECT COUNT(*) FROM " + PushRuleTable.table_name + + "SELECT COUNT(*) FROM push_rules" " WHERE user_name = ? AND priority_class = ? AND priority = ?" ) - txn.execute(sql, (user_name, priority_class, new_rule_priority)) + txn.execute(sql, (user_id, priority_class, new_rule_priority)) res = txn.fetchall() num_conflicting = res[0][0] # if there are conflicting rules, bump everything if num_conflicting: - sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority " + sql = "UPDATE push_rules SET priority = priority " if after: sql += "-1" else: @@ -149,30 +194,30 @@ class PushRuleStore(SQLBaseStore): else: sql += ">= ?" - txn.execute(sql, (user_name, priority_class, new_rule_priority)) + txn.execute(sql, (user_id, priority_class, new_rule_priority)) txn.call_after( - self.get_push_rules_for_user.invalidate, (user_name,) + self.get_push_rules_for_user.invalidate, (user_id,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, (user_name,) + self.get_push_rules_enabled_for_user.invalidate, (user_id,) ) self._simple_insert_txn( txn, - table=PushRuleTable.table_name, + table="push_rules", values=new_rule, ) - def _add_push_rule_highest_priority_txn(self, txn, user_name, + def _add_push_rule_highest_priority_txn(self, txn, user_id, priority_class, **kwargs): # find the highest priority rule in that class sql = ( - "SELECT COUNT(*), MAX(priority) FROM " + PushRuleTable.table_name + + "SELECT COUNT(*), MAX(priority) FROM push_rules" " WHERE user_name = ? and priority_class = ?" ) - txn.execute(sql, (user_name, priority_class)) + txn.execute(sql, (user_id, priority_class)) res = txn.fetchall() (how_many, highest_prio) = res[0] @@ -183,66 +228,66 @@ class PushRuleStore(SQLBaseStore): # and insert the new rule new_rule = kwargs new_rule['id'] = self._push_rule_id_gen.get_next_txn(txn) - new_rule['user_name'] = user_name + new_rule['user_name'] = user_id new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio txn.call_after( - self.get_push_rules_for_user.invalidate, (user_name,) + self.get_push_rules_for_user.invalidate, (user_id,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, (user_name,) + self.get_push_rules_enabled_for_user.invalidate, (user_id,) ) self._simple_insert_txn( txn, - table=PushRuleTable.table_name, + table="push_rules", values=new_rule, ) @defer.inlineCallbacks - def delete_push_rule(self, user_name, rule_id): + def delete_push_rule(self, user_id, rule_id): """ Delete a push rule. Args specify the row to be deleted and can be any of the columns in the push_rule table, but below are the standard ones Args: - user_name (str): The matrix ID of the push rule owner + user_id (str): The matrix ID of the push rule owner rule_id (str): The rule_id of the rule to be deleted """ yield self._simple_delete_one( - PushRuleTable.table_name, - {'user_name': user_name, 'rule_id': rule_id}, + "push_rules", + {'user_name': user_id, 'rule_id': rule_id}, desc="delete_push_rule", ) - self.get_push_rules_for_user.invalidate((user_name,)) - self.get_push_rules_enabled_for_user.invalidate((user_name,)) + self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) @defer.inlineCallbacks - def set_push_rule_enabled(self, user_name, rule_id, enabled): + def set_push_rule_enabled(self, user_id, rule_id, enabled): ret = yield self.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, - user_name, rule_id, enabled + user_id, rule_id, enabled ) defer.returnValue(ret) - def _set_push_rule_enabled_txn(self, txn, user_name, rule_id, enabled): + def _set_push_rule_enabled_txn(self, txn, user_id, rule_id, enabled): new_id = self._push_rules_enable_id_gen.get_next_txn(txn) self._simple_upsert_txn( txn, - PushRuleEnableTable.table_name, - {'user_name': user_name, 'rule_id': rule_id}, + "push_rules_enable", + {'user_name': user_id, 'rule_id': rule_id}, {'enabled': 1 if enabled else 0}, {'id': new_id}, ) txn.call_after( - self.get_push_rules_for_user.invalidate, (user_name,) + self.get_push_rules_for_user.invalidate, (user_id,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, (user_name,) + self.get_push_rules_enabled_for_user.invalidate, (user_id,) ) @@ -252,27 +297,3 @@ class RuleNotFoundException(Exception): class InconsistentRuleException(Exception): pass - - -class PushRuleTable(object): - table_name = "push_rules" - - fields = [ - "id", - "user_name", - "rule_id", - "priority_class", - "priority", - "conditions", - "actions", - ] - - -class PushRuleEnableTable(object): - table_name = "push_rules_enable" - - fields = [ - "user_name", - "rule_id", - "enabled" - ] diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 345c4e1104..8ec706178a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -80,17 +80,17 @@ class PusherStore(SQLBaseStore): defer.returnValue(rows) @defer.inlineCallbacks - def add_pusher(self, user_name, access_token, profile_tag, kind, app_id, + def add_pusher(self, user_id, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data): try: next_id = yield self._pushers_id_gen.get_next() yield self._simple_upsert( - PushersTable.table_name, + "pushers", dict( app_id=app_id, pushkey=pushkey, - user_name=user_name, + user_name=user_id, ), dict( access_token=access_token, @@ -112,42 +112,38 @@ class PusherStore(SQLBaseStore): raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name): + def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): yield self._simple_delete_one( - PushersTable.table_name, - {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name}, - desc="delete_pusher_by_app_id_pushkey_user_name", + "pushers", + {"app_id": app_id, "pushkey": pushkey, 'user_name': user_id}, + desc="delete_pusher_by_app_id_pushkey_user_id", ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_name, last_token): + def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): yield self._simple_update_one( - PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + "pushers", + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, {'last_token': last_token}, desc="update_pusher_last_token", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_name, + def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, last_token, last_success): yield self._simple_update_one( - PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + "pushers", + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, {'last_token': last_token, 'last_success': last_success}, desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_name, + def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): yield self._simple_update_one( - PushersTable.table_name, - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name}, + "pushers", + {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, {'failing_since': failing_since}, desc="update_pusher_failing_since", ) - - -class PushersTable(object): - table_name = "pushers" diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index a535063547..4202a6b3dc 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,12 +14,11 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached +from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer -from blist import sorteddict import logging import ujson as json @@ -31,7 +30,50 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = _RoomStreamChangeCache() + self._receipts_stream_cache = StreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None) + ) + + @cached(num_args=2) + def get_receipts_for_room(self, room_id, receipt_type): + return self._simple_select_list( + table="receipts_linearized", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + }, + retcols=("user_id", "event_id"), + desc="get_receipts_for_room", + ) + + @cached(num_args=3) + def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type): + return self._simple_select_one_onecol( + table="receipts_linearized", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id + }, + retcol="event_id", + desc="get_own_receipt_for_user", + allow_none=True, + ) + + @cachedInlineCallbacks(num_args=2) + def get_receipts_for_user(self, user_id, receipt_type): + def f(txn): + sql = ( + "SELECT room_id,event_id " + "FROM receipts_linearized " + "WHERE user_id = ? AND receipt_type = ? " + ) + txn.execute(sql, (user_id, receipt_type)) + return txn.fetchall() + + defer.returnValue(dict( + (yield self.runInteraction("get_receipts_for_user", f)) + )) @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): @@ -49,8 +91,8 @@ class ReceiptsStore(SQLBaseStore): room_ids = set(room_ids) if from_key: - room_ids = yield self._receipts_stream_cache.get_rooms_changed( - self, room_ids, from_key + room_ids = yield self._receipts_stream_cache.get_entities_changed( + room_ids, from_key ) results = yield self._get_linearized_receipts_for_rooms( @@ -182,29 +224,26 @@ class ReceiptsStore(SQLBaseStore): def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) - @cachedInlineCallbacks() - def get_graph_receipts_for_room(self, room_id): - """Get receipts for sending to remote servers. - """ - rows = yield self._simple_select_list( - table="receipts_graph", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", + def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_id, data, stream_id): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) - result = {} - for row in rows: - result.setdefault( - row["user_id"], {} - ).setdefault( - row["receipt_type"], [] - ).append(row["event_id"]) - - defer.returnValue(result) + txn.call_after( + self._receipts_stream_cache.entity_has_changed, + room_id, stream_id + ) - def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_id, data, stream_id): + txn.call_after( + self.get_last_receipt_event_id_for_user.invalidate, + (user_id, room_id, receipt_type) + ) # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -293,9 +332,6 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self._receipts_stream_cache.room_has_changed( - self, room_id, stream_id - ) have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, @@ -312,6 +348,7 @@ class ReceiptsStore(SQLBaseStore): ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, @@ -324,6 +361,15 @@ class ReceiptsStore(SQLBaseStore): def insert_graph_receipt_txn(self, txn, room_id, receipt_type, user_id, event_ids, data): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) + ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + self._simple_delete_txn( txn, table="receipts_graph", @@ -344,63 +390,3 @@ class ReceiptsStore(SQLBaseStore): "data": json.dumps(data), } ) - - -class _RoomStreamChangeCache(object): - """Keeps track of the stream_id of the latest change in rooms. - - Given a list of rooms and stream key, it will give a subset of rooms that - may have changed since that key. If the key is too old then the cache - will simply return all rooms. - """ - def __init__(self, size_of_cache=10000): - self._size_of_cache = size_of_cache - self._room_to_key = {} - self._cache = sorteddict() - self._earliest_key = None - self.name = "ReceiptsRoomChangeCache" - caches_by_name[self.name] = self._cache - - @defer.inlineCallbacks - def get_rooms_changed(self, store, room_ids, key): - """Returns subset of room ids that have had new receipts since the - given key. If the key is too old it will just return the given list. - """ - if key > (yield self._get_earliest_key(store)): - keys = self._cache.keys() - i = keys.bisect_right(key) - - result = set( - self._cache[k] for k in keys[i:] - ).intersection(room_ids) - - cache_counter.inc_hits(self.name) - else: - result = room_ids - cache_counter.inc_misses(self.name) - - defer.returnValue(result) - - @defer.inlineCallbacks - def room_has_changed(self, store, room_id, key): - """Informs the cache that the room has been changed at the given key. - """ - if key > (yield self._get_earliest_key(store)): - old_key = self._room_to_key.get(room_id, None) - if old_key: - key = max(key, old_key) - self._cache.pop(old_key, None) - self._cache[key] = room_id - - while len(self._cache) > self._size_of_cache: - k, r = self._cache.popitem() - self._earliest_key = max(k, self._earliest_key) - self._room_to_key.pop(r, None) - - @defer.inlineCallbacks - def _get_earliest_key(self, store): - if self._earliest_key is None: - self._earliest_key = yield store.get_max_receipt_stream_id() - self._earliest_key = int(self._earliest_key) - - defer.returnValue(self._earliest_key) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 09a05b08ef..967c732bda 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014 - 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re + from twisted.internet import defer from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList class RegistrationStore(SQLBaseStore): @@ -73,30 +75,45 @@ class RegistrationStore(SQLBaseStore): ) @defer.inlineCallbacks - def register(self, user_id, token, password_hash): + def register(self, user_id, token, password_hash, + was_guest=False, make_guest=False): """Attempts to register an account. Args: user_id (str): The desired user ID to register. token (str): The desired access token to use for this user. password_hash (str): Optional. The password hash for this user. + was_guest (bool): Optional. Whether this is a guest account being + upgraded to a non-guest account. + make_guest (boolean): True if the the new user should be guest, + false to add a regular user account. Raises: StoreError if the user_id could not be registered. """ yield self.runInteraction( "register", - self._register, user_id, token, password_hash + self._register, user_id, token, password_hash, was_guest, make_guest ) + self.is_guest.invalidate((user_id,)) - def _register(self, txn, user_id, token, password_hash): + def _register(self, txn, user_id, token, password_hash, was_guest, make_guest): now = int(self.clock.time()) next_id = self._access_tokens_id_gen.get_next_txn(txn) try: - txn.execute("INSERT INTO users(name, password_hash, creation_ts) " - "VALUES (?,?,?)", - [user_id, password_hash, now]) + if was_guest: + txn.execute("UPDATE users SET" + " password_hash = ?," + " upgrade_ts = ?," + " is_guest = ?" + " WHERE name = ?", + [password_hash, now, 1 if make_guest else 0, user_id]) + else: + txn.execute("INSERT INTO users " + "(name, password_hash, creation_ts, is_guest) " + "VALUES (?,?,?,?)", + [user_id, password_hash, now, 1 if make_guest else 0]) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE @@ -117,8 +134,9 @@ class RegistrationStore(SQLBaseStore): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash"], + retcols=["name", "password_hash", "is_guest"], allow_none=True, + desc="get_user_by_id", ) def get_users_by_id_case_insensitive(self, user_id): @@ -240,9 +258,41 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) + @cachedInlineCallbacks() + def is_guest(self, user_id): + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="is_guest", + allow_none=True, + desc="is_guest", + ) + + defer.returnValue(res if res else False) + + @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, + inlineCallbacks=True) + def are_guests(self, user_ids): + sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( + ",".join("?" for _ in user_ids), + ) + + rows = yield self._execute( + "are_guests", self.cursor_to_dict, sql, *user_ids + ) + + result = {user_id: False for user_id in user_ids} + + result.update({ + row["name"]: bool(row["is_guest"]) + for row in rows + }) + + defer.returnValue(result) + def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, access_tokens.id as token_id" + "SELECT users.name, users.is_guest, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" @@ -303,3 +353,37 @@ class RegistrationStore(SQLBaseStore): ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + + @defer.inlineCallbacks + def find_next_generated_user_id_localpart(self): + """ + Gets the localpart of the next generated user ID. + + Generated user IDs are integers, and we aim for them to be as small as + we can. Unfortunately, it's possible some of them are already taken by + existing users, and there may be gaps in the already taken range. This + function returns the start of the first allocatable gap. This is to + avoid the case of ID 10000000 being pre-allocated, so us wasting the + first (and shortest) many generated user IDs. + """ + def _find_next_generated_user_id(txn): + txn.execute("SELECT name FROM users") + rows = self.cursor_to_dict(txn) + + regex = re.compile("^@(\d+):") + + found = set() + + for r in rows: + user_id = r["name"] + match = regex.search(user_id) + if match: + found.add(int(match.group(1))) + for i in xrange(len(found) + 1): + if i not in found: + return i + + defer.returnValue((yield self.runInteraction( + "find_next_generated_user_id", + _find_next_generated_user_id + ))) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 0838eb3d12..40acb5c4ed 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 4f08df478c..46ab38a313 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ class RoomStore(SQLBaseStore): """ try: yield self._simple_insert( - RoomsTable.table_name, + "rooms", { "room_id": room_id, "creator": room_creator_user_id, @@ -70,9 +70,9 @@ class RoomStore(SQLBaseStore): A namedtuple containing the room information, or an empty list. """ return self._simple_select_one( - table=RoomsTable.table_name, + table="rooms", keyvalues={"room_id": room_id}, - retcols=RoomsTable.fields, + retcols=("room_id", "is_public", "creator"), desc="get_room", allow_none=True, ) @@ -87,90 +87,20 @@ class RoomStore(SQLBaseStore): desc="get_public_room_ids", ) - @defer.inlineCallbacks - def get_rooms(self, is_public): - """Retrieve a list of all public rooms. - - Args: - is_public (bool): True if the rooms returned should be public. - Returns: - A list of room dicts containing at least a "room_id" key, a - "topic" key if one is set, and a "name" key if one is set + def get_room_count(self): + """Retrieve a list of all rooms """ def f(txn): - def subquery(table_name, column_name=None): - column_name = column_name or table_name - return ( - "SELECT %(table_name)s.event_id as event_id, " - "%(table_name)s.room_id as room_id, %(column_name)s " - "FROM %(table_name)s " - "INNER JOIN current_state_events as c " - "ON c.event_id = %(table_name)s.event_id " % { - "column_name": column_name, - "table_name": table_name, - } - ) - - sql = ( - "SELECT" - " r.room_id," - " max(n.name)," - " max(t.topic)," - " max(v.history_visibility)," - " max(g.guest_access)" - " FROM rooms AS r" - " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" - " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" - " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id" - " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id" - " WHERE r.is_public = ?" - " GROUP BY r.room_id" % { - "topic": subquery("topics", "topic"), - "name": subquery("room_names", "name"), - "history_visibility": subquery("history_visibility"), - "guest_access": subquery("guest_access"), - } - ) + sql = "SELECT count(*) FROM rooms" + txn.execute(sql) + row = txn.fetchone() + return row[0] or 0 - txn.execute(sql, (is_public,)) - - rows = txn.fetchall() - - for i, row in enumerate(rows): - room_id = row[0] - aliases = self._simple_select_onecol_txn( - txn, - table="room_aliases", - keyvalues={ - "room_id": room_id - }, - retcol="room_alias", - ) - - rows[i] = list(row) + [aliases] - - return rows - - rows = yield self.runInteraction( + return self.runInteraction( "get_rooms", f ) - ret = [ - { - "room_id": r[0], - "name": r[1], - "topic": r[2], - "world_readable": r[3] == "world_readable", - "guest_can_join": r[4] == "can_join", - "aliases": r[5], - } - for r in rows - if r[5] # We only return rooms that have at least one alias. - ] - - defer.returnValue(ret) - def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( @@ -275,13 +205,3 @@ class RoomStore(SQLBaseStore): aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) - - -class RoomsTable(object): - table_name = "rooms" - - fields = [ - "room_id", - "is_public", - "creator" - ] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4e0e9ab59a..3065b0c1a5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -58,6 +58,10 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after( + self._membership_stream_cache.entity_has_changed, + event.state_key, event.internal_metadata.stream_ordering + ) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -110,6 +114,7 @@ class RoomMemberStore(SQLBaseStore): membership=membership, ).addCallback(self._get_events) + @cached() def get_invites_for_user(self, user_id): """ Get all the invite events for a user Args: @@ -240,7 +245,7 @@ class RoomMemberStore(SQLBaseStore): return rows - @cached() + @cached(max_entries=5000) def get_rooms_for_user(self, user_id): return self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], @@ -287,6 +292,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, (user_id, room_id)) yield self.runInteraction("forget_membership", f) self.was_forgotten_at.invalidate_all() + self.who_forgot_in_room.invalidate_all() self.did_forget.invalidate((user_id, room_id)) @cachedInlineCallbacks(num_args=2) @@ -336,3 +342,15 @@ class RoomMemberStore(SQLBaseStore): return rows[0][0] forgot = yield self.runInteraction("did_forget_membership_at", f) defer.returnValue(forgot == 1) + + @cached() + def who_forgot_in_room(self, room_id): + return self._simple_select_list( + table="room_memberships", + retcols=("user_id", "event_id"), + keyvalues={ + "room_id": room_id, + "forgotten": 1, + }, + desc="who_forgot" + ) diff --git a/synapse/storage/schema/delta/11/v11.sql b/synapse/storage/schema/delta/11/v11.sql index 313592221b..e7b4f90127 100644 --- a/synapse/storage/schema/delta/11/v11.sql +++ b/synapse/storage/schema/delta/11/v11.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index 878c36260a..5964c5aaac 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index 3265924013..5eb93b38b2 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py index 61232f9757..5c40a77757 100644 --- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py +++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py @@ -1,4 +1,4 @@ -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql index 1d09ad7a15..a831920da6 100644 --- a/synapse/storage/schema/delta/14/v14.sql +++ b/synapse/storage/schema/delta/14/v14.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql index db2e720393..e4f5e76aec 100644 --- a/synapse/storage/schema/delta/15/appservice_txns.sql +++ b/synapse/storage/schema/delta/15/appservice_txns.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/17/drop_indexes.sql b/synapse/storage/schema/delta/17/drop_indexes.sql index 8eb3325a6b..7c9a90e27f 100644 --- a/synapse/storage/schema/delta/17/drop_indexes.sql +++ b/synapse/storage/schema/delta/17/drop_indexes.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/17/server_keys.sql b/synapse/storage/schema/delta/17/server_keys.sql index 513c30a717..70b247a06b 100644 --- a/synapse/storage/schema/delta/17/server_keys.sql +++ b/synapse/storage/schema/delta/17/server_keys.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/18/server_keys_bigger_ints.sql b/synapse/storage/schema/delta/18/server_keys_bigger_ints.sql index c0b0fdfb69..6e0871c92b 100644 --- a/synapse/storage/schema/delta/18/server_keys_bigger_ints.sql +++ b/synapse/storage/schema/delta/18/server_keys_bigger_ints.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/19/event_index.sql b/synapse/storage/schema/delta/19/event_index.sql index 3881fc9897..18b97b4332 100644 --- a/synapse/storage/schema/delta/19/event_index.sql +++ b/synapse/storage/schema/delta/19/event_index.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py index 543e57bbe2..29164732af 100644 --- a/synapse/storage/schema/delta/20/pushers.py +++ b/synapse/storage/schema/delta/20/pushers.py @@ -1,4 +1,4 @@ -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/21/end_to_end_keys.sql b/synapse/storage/schema/delta/21/end_to_end_keys.sql index 8b4a380d11..4c2fb20b77 100644 --- a/synapse/storage/schema/delta/21/end_to_end_keys.sql +++ b/synapse/storage/schema/delta/21/end_to_end_keys.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index 2f64d609fc..d070845477 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql index b182b2b661..7bc061dff6 100644 --- a/synapse/storage/schema/delta/22/receipts_index.sql +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/23/drop_state_index.sql b/synapse/storage/schema/delta/23/drop_state_index.sql index 07d0ea5cb2..ae09fa0065 100644 --- a/synapse/storage/schema/delta/23/drop_state_index.sql +++ b/synapse/storage/schema/delta/23/drop_state_index.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/23/refresh_tokens.sql b/synapse/storage/schema/delta/23/refresh_tokens.sql index 437b1ac1be..34db0cf12b 100644 --- a/synapse/storage/schema/delta/23/refresh_tokens.sql +++ b/synapse/storage/schema/delta/23/refresh_tokens.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql index e9165d2917..5f508af7a9 100644 --- a/synapse/storage/schema/delta/24/stats_reporting.sql +++ b/synapse/storage/schema/delta/24/stats_reporting.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql index 41a9b59b1b..2ad9e8fa56 100644 --- a/synapse/storage/schema/delta/25/00background_updates.sql +++ b/synapse/storage/schema/delta/25/00background_updates.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index ba48e43792..d3ff2b1779 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -1,4 +1,4 @@ -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/25/guest_access.sql b/synapse/storage/schema/delta/25/guest_access.sql index bdb90e7118..1ea389b471 100644 --- a/synapse/storage/schema/delta/25/guest_access.sql +++ b/synapse/storage/schema/delta/25/guest_access.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/25/history_visibility.sql b/synapse/storage/schema/delta/25/history_visibility.sql index 532cb05151..f468fc1897 100644 --- a/synapse/storage/schema/delta/25/history_visibility.sql +++ b/synapse/storage/schema/delta/25/history_visibility.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql index 527424c998..7a32ce68e4 100644 --- a/synapse/storage/schema/delta/25/tags.sql +++ b/synapse/storage/schema/delta/25/tags.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql index 3198a0d29c..e395de2b5e 100644 --- a/synapse/storage/schema/delta/26/account_data.sql +++ b/synapse/storage/schema/delta/26/account_data.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/27/account_data.sql b/synapse/storage/schema/delta/27/account_data.sql index 9f25416005..bf0558b5b3 100644 --- a/synapse/storage/schema/delta/27/account_data.sql +++ b/synapse/storage/schema/delta/27/account_data.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/27/forgotten_memberships.sql b/synapse/storage/schema/delta/27/forgotten_memberships.sql index beeb8a288b..e2094f37fe 100644 --- a/synapse/storage/schema/delta/27/forgotten_memberships.sql +++ b/synapse/storage/schema/delta/27/forgotten_memberships.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py index 8d4a981975..f8c16391a2 100644 --- a/synapse/storage/schema/delta/27/ts.py +++ b/synapse/storage/schema/delta/27/ts.py @@ -1,4 +1,4 @@ -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/delta/28/event_push_actions.sql b/synapse/storage/schema/delta/28/event_push_actions.sql new file mode 100644 index 0000000000..4d519849df --- /dev/null +++ b/synapse/storage/schema/delta/28/event_push_actions.sql @@ -0,0 +1,27 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_push_actions( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) +); + + +CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); +CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id); diff --git a/synapse/storage/schema/delta/28/events_room_stream.sql b/synapse/storage/schema/delta/28/events_room_stream.sql new file mode 100644 index 0000000000..200c35e6e2 --- /dev/null +++ b/synapse/storage/schema/delta/28/events_room_stream.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +CREATE INDEX events_room_stream on events(room_id, stream_ordering); diff --git a/synapse/storage/schema/delta/28/public_roms_index.sql b/synapse/storage/schema/delta/28/public_roms_index.sql new file mode 100644 index 0000000000..ba62a974a4 --- /dev/null +++ b/synapse/storage/schema/delta/28/public_roms_index.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +CREATE INDEX public_room_index on rooms(is_public); diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql new file mode 100644 index 0000000000..452a1b3c6c --- /dev/null +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015, 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_user ON receipts_linearized( + user_id +); diff --git a/synapse/storage/schema/delta/28/upgrade_times.sql b/synapse/storage/schema/delta/28/upgrade_times.sql new file mode 100644 index 0000000000..3e4a9ab455 --- /dev/null +++ b/synapse/storage/schema/delta/28/upgrade_times.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Stores the timestamp when a user upgraded from a guest to a full user, if + * that happened. + */ + +ALTER TABLE users ADD COLUMN upgrade_ts BIGINT; diff --git a/synapse/storage/schema/delta/28/users_is_guest.sql b/synapse/storage/schema/delta/28/users_is_guest.sql new file mode 100644 index 0000000000..21d2b420bf --- /dev/null +++ b/synapse/storage/schema/delta/28/users_is_guest.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD is_guest SMALLINT DEFAULT 0 NOT NULL; +/* + * NB: any guest users created between 27 and 28 will be incorrectly + * marked as not guests: we don't bother to fill these in correctly + * because guest access is not really complete in 27 anyway so it's + * very unlikley there will be any guest users created. + */ diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql new file mode 100644 index 0000000000..7e7b09820a --- /dev/null +++ b/synapse/storage/schema/delta/29/push_actions.sql @@ -0,0 +1,31 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE event_push_actions ADD COLUMN topological_ordering BIGINT; +ALTER TABLE event_push_actions ADD COLUMN stream_ordering BIGINT; +ALTER TABLE event_push_actions ADD COLUMN notif SMALLINT; +ALTER TABLE event_push_actions ADD COLUMN highlight SMALLINT; + +UPDATE event_push_actions SET stream_ordering = ( + SELECT stream_ordering FROM events WHERE event_id = event_push_actions.event_id +), topological_ordering = ( + SELECT topological_ordering FROM events WHERE event_id = event_push_actions.event_id +); + +UPDATE event_push_actions SET notif = 1, highlight = 0; + +CREATE INDEX event_push_actions_rm_tokens on event_push_actions( + user_id, room_id, topological_ordering, stream_ordering +); diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index f7020f7793..52eec88357 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index 636b2d3353..00ce85980e 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index 1901654ac2..dfbbf9fd54 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index afc142045e..ca0ca1b694 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index e927e581d1..9c264d6ece 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index d8d82e9fe3..492725994c 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index 26e4204437..b314e6df75 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index 69621955d4..318f0d9aa5 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 5027b1e3f6..71a91f8ec9 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index ffd164ab71..b901e0f017 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index cc5b54f5aa..a3f4a0a790 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index eec3da3c35..6c1d4c34a1 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql index d382d63fbd..aee0e68473 100644 --- a/synapse/storage/schema/full_schemas/16/application_services.sql +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql index f7020f7793..52eec88357 100644 --- a/synapse/storage/schema/full_schemas/16/event_edges.sql +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql index 636b2d3353..00ce85980e 100644 --- a/synapse/storage/schema/full_schemas/16/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index 576653a3c9..ba5346806e 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql index afc142045e..ca0ca1b694 100644 --- a/synapse/storage/schema/full_schemas/16/keys.sql +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql index dacbda40ca..8f3759bb2a 100644 --- a/synapse/storage/schema/full_schemas/16/media_repository.sql +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql index 80088413ba..283136df20 100644 --- a/synapse/storage/schema/full_schemas/16/presence.sql +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql index 934be86520..c04f4747d9 100644 --- a/synapse/storage/schema/full_schemas/16/profiles.sql +++ b/synapse/storage/schema/full_schemas/16/profiles.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql index 9387f920f0..e44465cf45 100644 --- a/synapse/storage/schema/full_schemas/16/push.sql +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql index 69621955d4..318f0d9aa5 100644 --- a/synapse/storage/schema/full_schemas/16/redactions.sql +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql index 412bb97fad..d47da3b12f 100644 --- a/synapse/storage/schema/full_schemas/16/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql index 705cac6ce9..96391a8f0e 100644 --- a/synapse/storage/schema/full_schemas/16/state.sql +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql index 1ab77cdb63..14b67cce25 100644 --- a/synapse/storage/schema/full_schemas/16/transactions.sql +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql index d2fa3122da..f013aa8b18 100644 --- a/synapse/storage/schema/full_schemas/16/users.sql +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -1,4 +1,4 @@ -/* Copyright 2014, 2015 OpenMarket Ltd +/* Copyright 2014-2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index d682608aa0..a7ade69986 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -1,4 +1,4 @@ -/* Copyright 2015 OpenMarket Ltd +/* Copyright 2015, 2016 OpenMarket Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 6cb5e73b6e..59ac7f424c 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b070be504d..70c6a06cd1 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 80e9b63f50..372b540002 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -171,41 +171,43 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - def _get_state_groups_from_groups(self, groups_and_types): + def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> state event ids - - Args: - groups_and_types (list): list of 2-tuple (`group`, `types`) """ - def f(txn): - results = {} - for group, types in groups_and_types: - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - - sql = ( - "SELECT event_id FROM state_groups_state WHERE" - " state_group = ? %s" - ) % (where_clause,) + def f(txn, groups): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" - args = [group] - if types is not None: - args.extend([i for typ in types for i in typ]) + sql = ( + "SELECT state_group, event_id FROM state_groups_state WHERE" + " state_group IN (%s) %s" % ( + ",".join("?" for _ in groups), + where_clause, + ) + ) - txn.execute(sql, args) + args = list(groups) + if types is not None: + args.extend([i for typ in types for i in typ]) - results[group] = [r[0] for r in txn.fetchall()] + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + results = {} + for row in rows: + results.setdefault(row["state_group"], []).append(row["event_id"]) return results - return self.runInteraction( - "_get_state_groups_from_groups", - f, - ) + chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] + for chunk in chunks: + return self.runInteraction( + "_get_state_groups_from_groups", + f, chunk + ) @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): @@ -264,26 +266,20 @@ class StateStore(SQLBaseStore): ) @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", - num_args=1) + num_args=1, inlineCallbacks=True) def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ - def f(txn): - results = {} - for event_id in event_ids: - results[event_id] = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={ - "event_id": event_id, - }, - retcol="state_group", - allow_none=True, - ) - - return results + rows = yield self._simple_select_many_batch( + table="event_to_state_groups", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id", "state_group",), + desc="_get_state_group_for_events", + ) - return self.runInteraction("_get_state_group_for_events", f) + defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` @@ -355,7 +351,7 @@ class StateStore(SQLBaseStore): all events are returned. """ results = {} - missing_groups_and_types = [] + missing_groups = [] if types is not None: for group in set(groups): state_dict, missing_types, got_all = self._get_some_state_from_cache( @@ -364,7 +360,7 @@ class StateStore(SQLBaseStore): results[group] = state_dict if not got_all: - missing_groups_and_types.append((group, missing_types)) + missing_groups.append(group) else: for group in set(groups): state_dict, got_all = self._get_all_state_from_cache( @@ -373,9 +369,9 @@ class StateStore(SQLBaseStore): results[group] = state_dict if not got_all: - missing_groups_and_types.append((group, None)) + missing_groups.append(group) - if not missing_groups_and_types: + if not missing_groups: defer.returnValue({ group: { type_tuple: event @@ -389,7 +385,7 @@ class StateStore(SQLBaseStore): cache_seq_num = self._state_group_cache.sequence group_state_dict = yield self._get_state_groups_from_groups( - missing_groups_and_types + missing_groups, types ) state_events = yield self._get_events( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index be8ba76aae..367ffc9543 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn import logging @@ -77,7 +77,6 @@ def upper_bound(token): class StreamStore(SQLBaseStore): - @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the @@ -157,7 +156,147 @@ class StreamStore(SQLBaseStore): results = yield self.runInteraction("get_appservice_room_stream", f) defer.returnValue(results) - @log_function + @defer.inlineCallbacks + def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0): + from_id = RoomStreamToken.parse_stream_token(from_key).stream + + room_ids = yield self._events_stream_cache.get_entities_changed( + room_ids, from_id + ) + + if not room_ids: + defer.returnValue({}) + + results = {} + room_ids = list(room_ids) + for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): + res = yield defer.gatherResults([ + preserve_fn(self.get_room_events_stream_for_room)( + room_id, from_key, to_key, limit, + ) + for room_id in room_ids + ]) + results.update(dict(zip(rm_ids, res))) + + defer.returnValue(results) + + @defer.inlineCallbacks + def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue(([], from_key)) + + if from_id: + has_changed = yield self._events_stream_cache.has_entity_changed( + room_id, from_id + ) + + if not has_changed: + defer.returnValue(([], from_key)) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, from_id, to_id, limit)) + else: + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering DESC LIMIT ?" + ) + txn.execute(sql, (room_id, to_id, limit)) + + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction("get_room_events_stream_for_room", f) + + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows, topo_order=False) + + ret.reverse() + + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + defer.returnValue((ret, key)) + + @defer.inlineCallbacks + def get_membership_changes_for_user(self, user_id, from_key, to_key): + if from_key is not None: + from_id = RoomStreamToken.parse_stream_token(from_key).stream + else: + from_id = None + to_id = RoomStreamToken.parse_stream_token(to_key).stream + + if from_key == to_key: + defer.returnValue([]) + + if from_id: + has_changed = self._membership_stream_cache.has_entity_changed( + user_id, int(from_id) + ) + if not has_changed: + defer.returnValue([]) + + def f(txn): + if from_id is not None: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) + else: + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + ) + txn.execute(sql, (user_id, to_id,)) + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction("get_membership_changes_for_user", f) + + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows, topo_order=False) + + defer.returnValue(ret) + def get_room_events_stream( self, user_id, @@ -174,7 +313,8 @@ class StreamStore(SQLBaseStore): "SELECT c.room_id FROM history_visibility AS h" " INNER JOIN current_state_events AS c" " ON h.event_id = c.event_id" - " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % ( + " WHERE c.room_id IN (%s)" + " AND h.history_visibility = 'world_readable'" % ( ",".join(map(lambda _: "?", room_ids)) ) ) @@ -187,11 +327,6 @@ class StreamStore(SQLBaseStore): " WHERE m.user_id = ? AND m.membership = 'join'" ) current_room_membership_args = [user_id] - if room_ids: - current_room_membership_sql += " AND m.room_id in (%s)" % ( - ",".join(map(lambda _: "?", room_ids)) - ) - current_room_membership_args = [user_id] + room_ids # We also want to get any membership events about that user, e.g. # invites or leave notifications. @@ -430,10 +565,23 @@ class StreamStore(SQLBaseStore): table="events", keyvalues={"event_id": event_id}, retcols=("stream_ordering", "topological_ordering"), + desc="get_topological_token_for_event", ).addCallback(lambda row: "t%d-%d" % ( row["topological_ordering"], row["stream_ordering"],) ) + def get_max_topological_token_for_stream_and_room(self, room_id, stream_key): + sql = ( + "SELECT max(topological_ordering) FROM events" + " WHERE room_id = ? AND stream_ordering < ?" + ) + return self._execute( + "get_max_topological_token_for_stream_and_room", None, + sql, room_id, stream_key, + ).addCallback( + lambda r: r[0][0] if r else 0 + ) + def _get_max_topological_txn(self, txn): txn.execute( "SELECT MAX(topological_ordering) FROM events" @@ -444,27 +592,21 @@ class StreamStore(SQLBaseStore): rows = txn.fetchall() return rows[0][0] if rows else 0 - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) - - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) - - logger.debug("min_token is: %s", self.min_token) - - defer.returnValue(self.min_token) - @staticmethod - def _set_before_and_after(events, rows): + def _set_before_and_after(events, rows, topo_order=True): for event, row in zip(events, rows): stream = row["stream_ordering"] - topo = event.depth + if topo_order: + topo = event.depth + else: + topo = None internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) + internal.order = ( + int(topo) if topo else 0, + int(stream), + ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index f520f60c6c..e1a9c0c261 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer -from .util.id_generators import StreamIdGenerator import ujson as json import logging @@ -25,13 +24,6 @@ logger = logging.getLogger(__name__) class TagsStore(SQLBaseStore): - def __init__(self, hs): - super(TagsStore, self).__init__(hs) - - self._account_data_id_gen = StreamIdGenerator( - "account_data_max_stream_id", "stream_id" - ) - def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream @@ -87,6 +79,12 @@ class TagsStore(SQLBaseStore): room_ids = [row[0] for row in txn.fetchall()] return room_ids + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + if not changed: + defer.returnValue({}) + room_ids = yield self.runInteraction( "get_updated_tags", get_updated_tags_txn ) @@ -184,6 +182,11 @@ class TagsStore(SQLBaseStore): next_id(int): The the revision to advance to. """ + txn.call_after( + self._account_data_stream_cache.entity_has_changed, + user_id, next_id + ) + update_max_id_sql = ( "UPDATE account_data_max_stream_id" " SET stream_id = ?" diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ad099775eb..4475c451c1 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached -from collections import namedtuple - from canonicaljson import encode_canonical_json import logging @@ -50,12 +48,15 @@ class TransactionStore(SQLBaseStore): def _get_received_txn_response(self, txn, transaction_id, origin): result = self._simple_select_one_txn( txn, - table=ReceivedTransactionsTable.table_name, + table="received_transactions", keyvalues={ "transaction_id": transaction_id, "origin": origin, }, - retcols=ReceivedTransactionsTable.fields, + retcols=( + "transaction_id", "origin", "ts", "response_code", "response_json", + "has_been_referenced", + ), allow_none=True, ) @@ -79,7 +80,7 @@ class TransactionStore(SQLBaseStore): """ return self._simple_insert( - table=ReceivedTransactionsTable.table_name, + table="received_transactions", values={ "transaction_id": transaction_id, "origin": origin, @@ -136,7 +137,7 @@ class TransactionStore(SQLBaseStore): self._simple_insert_txn( txn, - table=SentTransactions.table_name, + table="sent_transactions", values={ "id": next_id, "transaction_id": transaction_id, @@ -171,7 +172,7 @@ class TransactionStore(SQLBaseStore): code, response_json): self._simple_update_one_txn( txn, - table=SentTransactions.table_name, + table="sent_transactions", keyvalues={ "transaction_id": transaction_id, "destination": destination, @@ -229,11 +230,11 @@ class TransactionStore(SQLBaseStore): def _get_destination_retry_timings(self, txn, destination): result = self._simple_select_one_txn( txn, - table=DestinationsTable.table_name, + table="destinations", keyvalues={ "destination": destination, }, - retcols=DestinationsTable.fields, + retcols=("destination", "retry_last_ts", "retry_interval"), allow_none=True, ) @@ -304,52 +305,3 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) - - -class ReceivedTransactionsTable(object): - table_name = "received_transactions" - - fields = [ - "transaction_id", - "origin", - "ts", - "response_code", - "response_json", - "has_been_referenced", - ] - - -class SentTransactions(object): - table_name = "sent_transactions" - - fields = [ - "id", - "transaction_id", - "destination", - "ts", - "response_code", - "response_json", - ] - - EntryType = namedtuple("SentTransactionsEntry", fields) - - -class TransactionsToPduTable(object): - table_name = "transaction_id_to_pdu" - - fields = [ - "transaction_id", - "destination", - "pdu_id", - "pdu_origin", - ] - - -class DestinationsTable(object): - table_name = "destinations" - - fields = [ - "destination", - "retry_last_ts", - "retry_interval", - ] diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py index c488b10d3c..bfebb0f644 100644 --- a/synapse/storage/util/__init__.py +++ b/synapse/storage/util/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index e956df62c7..5c522f4ab9 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -72,28 +72,24 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self, table, column): + def __init__(self, db_conn, table, column): self.table = table self.column = column self._lock = threading.Lock() - self._current_max = None + cur = db_conn.cursor() + self._current_max = self._get_or_compute_current_max(cur) + cur.close() + self._unfinished_ids = deque() - @defer.inlineCallbacks def get_next(self, store): """ Usage: with yield stream_id_gen.get_next as stream_id: # ... persist event ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: self._current_max += 1 next_id = self._current_max @@ -108,21 +104,14 @@ class StreamIdGenerator(object): with self._lock: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_next_mult(self, store, n): """ Usage: with yield stream_id_gen.get_next(store, n) as stream_ids: # ... persist events ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: next_ids = range(self._current_max + 1, self._current_max + n + 1) self._current_max += n @@ -139,24 +128,17 @@ class StreamIdGenerator(object): for next_id in next_ids: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_max_token(self, store): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: if self._unfinished_ids: - defer.returnValue(self._unfinished_ids[0] - 1) + return self._unfinished_ids[0] - 1 - defer.returnValue(self._current_max) + return self._current_max def _get_or_compute_current_max(self, txn): with self._lock: |