diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 57863bba4d..045ae6c03f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -94,7 +94,8 @@ class DataStore(RoomMemberStore, RoomStore,
)
self._stream_id_gen = StreamIdGenerator(
- db_conn, "events", "stream_ordering"
+ db_conn, "events", "stream_ordering",
+ extra_tables=[("local_invites", "stream_id")]
)
self._backfill_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering", step=-1
@@ -176,39 +177,6 @@ class DataStore(RoomMemberStore, RoomStore,
self.__presence_on_startup = None
return active_on_startup
- def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
- # Fetch a mapping of room_id -> max stream position for "recent" rooms.
- # It doesn't really matter how many we get, the StreamChangeCache will
- # do the right thing to ensure it respects the max size of cache.
- sql = (
- "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
- " WHERE %(stream)s > ? - 100000"
- " GROUP BY %(entity)s"
- ) % {
- "table": table,
- "entity": entity_column,
- "stream": stream_column,
- }
-
- sql = self.database_engine.convert_param_style(sql)
-
- txn = db_conn.cursor()
- txn.execute(sql, (int(max_value),))
- rows = txn.fetchall()
- txn.close()
-
- cache = {
- row[0]: int(row[1])
- for row in rows
- }
-
- if cache:
- min_val = min(cache.values())
- else:
- min_val = max_value
-
- return cache, min_val
-
def _get_active_presence(self, db_conn):
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b75b79df36..04d7fcf6d6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -816,6 +816,40 @@ class SQLBaseStore(object):
self._next_stream_id += 1
return i
+ def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
+ max_value):
+ # Fetch a mapping of room_id -> max stream position for "recent" rooms.
+ # It doesn't really matter how many we get, the StreamChangeCache will
+ # do the right thing to ensure it respects the max size of cache.
+ sql = (
+ "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+ " WHERE %(stream)s > ? - 100000"
+ " GROUP BY %(entity)s"
+ ) % {
+ "table": table,
+ "entity": entity_column,
+ "stream": stream_column,
+ }
+
+ sql = self.database_engine.convert_param_style(sql)
+
+ txn = db_conn.cursor()
+ txn.execute(sql, (int(max_value),))
+ rows = txn.fetchall()
+ txn.close()
+
+ cache = {
+ row[0]: int(row[1])
+ for row in rows
+ }
+
+ if cache:
+ min_val = min(cache.values())
+ else:
+ min_val = max_value
+
+ return cache, min_val
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index a48230b93f..7bb5de1fe7 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -26,13 +26,13 @@ SUPPORTED_MODULE = {
}
-def create_engine(config):
- name = config.database_config["name"]
+def create_engine(database_config):
+ name = database_config["name"]
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
module = importlib.import_module(name)
- return engine_class(module, config=config)
+ return engine_class(module)
raise RuntimeError(
"Unsupported database engine '%s'" % (name,)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index a09685b4df..c2290943b4 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,18 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import prepare_database
-
from ._base import IncorrectDatabaseSetup
class PostgresEngine(object):
single_threaded = False
- def __init__(self, database_module, config):
+ def __init__(self, database_module):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)
- self.config = config
def check_database(self, txn):
txn.execute("SHOW SERVER_ENCODING")
@@ -44,9 +41,6 @@ class PostgresEngine(object):
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
- def prepare_database(self, db_conn):
- prepare_database(db_conn, self, config=self.config)
-
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
return error.pgcode in ["40001", "40P01"]
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 522b905949..14203aa500 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import (
- prepare_database, prepare_sqlite3_database
-)
+from synapse.storage.prepare_database import prepare_database
import struct
@@ -23,9 +21,8 @@ import struct
class Sqlite3Engine(object):
single_threaded = True
- def __init__(self, database_module, config):
+ def __init__(self, database_module):
self.module = database_module
- self.config = config
def check_database(self, txn):
pass
@@ -34,13 +31,9 @@ class Sqlite3Engine(object):
return sql
def on_new_connection(self, db_conn):
- self.prepare_database(db_conn)
+ prepare_database(db_conn, self, config=None)
db_conn.create_function("rank", 1, _rank)
- def prepare_database(self, db_conn):
- prepare_sqlite3_database(db_conn)
- prepare_database(db_conn, self, config=self.config)
-
def is_deadlock(self, error):
return False
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3489315e0d..0827946207 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -163,6 +163,22 @@ class EventFederationStore(SQLBaseStore):
room_id,
)
+ @defer.inlineCallbacks
+ def get_max_depth_of_events(self, event_ids):
+ sql = (
+ "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
+ ) % (",".join(["?"] * len(event_ids)),)
+
+ rows = yield self._execute(
+ "get_max_depth_of_events", None,
+ sql, *event_ids
+ )
+
+ if rows:
+ defer.returnValue(rows[0][0])
+ else:
+ defer.returnValue(1)
+
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c4dc3b3d51..ee87a71719 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -367,7 +367,8 @@ class EventsStore(SQLBaseStore):
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
- ]
+ ],
+ backfilled=backfilled,
)
def event_dict(event):
@@ -485,14 +486,8 @@ class EventsStore(SQLBaseStore):
return
for event, _ in state_events_and_contexts:
- if (not event.internal_metadata.is_invite_from_remote()
- and event.internal_metadata.is_outlier()):
- # Outlier events generally shouldn't clobber the current state.
- # However invites from remote severs for rooms we aren't in
- # are a bit special: they don't come with any associated
- # state so are technically an outlier, however all the
- # client-facing code assumes that they are in the current
- # state table so we insert the event anyway.
+ if event.internal_metadata.is_outlier():
+ # Outlier events shouldn't clobber the current state.
continue
if context.rejected:
@@ -1139,7 +1134,7 @@ class EventsStore(SQLBaseStore):
upper_bound = current_forward_id
sql = (
- "SELECT -event_stream_ordering FROM current_state_resets"
+ "SELECT event_stream_ordering FROM current_state_resets"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering ASC"
@@ -1148,7 +1143,7 @@ class EventsStore(SQLBaseStore):
state_resets = txn.fetchall()
sql = (
- "SELECT -event_stream_ordering, event_id, state_group"
+ "SELECT event_stream_ordering, event_id, state_group"
" FROM ex_outlier_stream"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3f29aad1e8..00833422af 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 30
+SCHEMA_VERSION = 31
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -53,6 +53,9 @@ class UpgradeDatabaseException(PrepareDatabaseException):
def prepare_database(db_conn, database_engine, config):
"""Prepares a database for usage. Will either create all necessary tables
or upgrade from an older schema version.
+
+ If `config` is None then prepare_database will assert that no upgrade is
+ necessary, *or* will create a fresh database if the database is empty.
"""
try:
cur = db_conn.cursor()
@@ -60,13 +63,18 @@ def prepare_database(db_conn, database_engine, config):
if version_info:
user_version, delta_files, upgraded = version_info
- _upgrade_existing_database(
- cur, user_version, delta_files, upgraded, database_engine, config
- )
- else:
- _setup_new_database(cur, database_engine, config)
- # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+ if config is None:
+ if user_version != SCHEMA_VERSION:
+ # If we don't pass in a config file then we are expecting to
+ # have already upgraded the DB.
+ raise UpgradeDatabaseException("Database needs to be upgraded")
+ else:
+ _upgrade_existing_database(
+ cur, user_version, delta_files, upgraded, database_engine, config
+ )
+ else:
+ _setup_new_database(cur, database_engine)
cur.close()
db_conn.commit()
@@ -75,7 +83,7 @@ def prepare_database(db_conn, database_engine, config):
raise
-def _setup_new_database(cur, database_engine, config):
+def _setup_new_database(cur, database_engine):
"""Sets up the database by finding a base set of "full schemas" and then
applying any necessary deltas.
@@ -148,12 +156,13 @@ def _setup_new_database(cur, database_engine, config):
applied_delta_files=[],
upgraded=False,
database_engine=database_engine,
- config=config,
+ config=None,
+ is_empty=True,
)
def _upgrade_existing_database(cur, current_version, applied_delta_files,
- upgraded, database_engine, config):
+ upgraded, database_engine, config, is_empty=False):
"""Upgrades an existing database.
Delta files can either be SQL stored in *.sql files, or python modules
@@ -246,7 +255,9 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
module_name, absolute_path, python_file
)
logger.debug("Running script %s", relative_path)
- module.run_upgrade(cur, database_engine, config=config)
+ module.run_create(cur, database_engine)
+ if not is_empty:
+ module.run_upgrade(cur, database_engine, config=config)
elif ext == ".pyc":
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
@@ -361,36 +372,3 @@ def _get_or_create_schema_state(txn, database_engine):
return current_version, applied_deltas, upgraded
return None
-
-
-def prepare_sqlite3_database(db_conn):
- """This function should be called before `prepare_database` on sqlite3
- databases.
-
- Since we changed the way we store the current schema version and handle
- updates to schemas, we need a way to upgrade from the old method to the
- new. This only affects sqlite databases since they were the only ones
- supported at the time.
- """
- with db_conn:
- schema_path = os.path.join(
- dir_path, "schema", "schema_version.sql",
- )
- create_schema = read_schema(schema_path)
- db_conn.executescript(create_schema)
-
- c = db_conn.execute("SELECT * FROM schema_version")
- rows = c.fetchall()
- c.close()
-
- if not rows:
- c = db_conn.execute("PRAGMA user_version")
- row = c.fetchone()
- c.close()
-
- if row and row[0]:
- db_conn.execute(
- "REPLACE INTO schema_version (version, upgraded)"
- " VALUES (?,?)",
- (row[0], False)
- )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 4befebc8e2..7fdd84bbdc 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -160,8 +160,8 @@ class ReceiptsStore(SQLBaseStore):
"content": content,
}])
- @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids",
- num_args=3, inlineCallbacks=True)
+ @cachedList(cached_method_name="get_linearized_receipts_for_room",
+ list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
defer.returnValue({})
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d46a963bb8..1f71773aaa 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -319,7 +319,7 @@ class RegistrationStore(SQLBaseStore):
defer.returnValue(res if res else False)
- @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1,
+ @cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1,
inlineCallbacks=True)
def are_guests(self, user_ids):
sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 430b49c12e..66e7a40e3c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
- def _store_room_members_txn(self, txn, events):
+ def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
"""
self._simple_insert_many_txn(
@@ -62,6 +62,64 @@ class RoomMemberStore(SQLBaseStore):
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
)
+ txn.call_after(
+ self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+ )
+
+ # We update the local_invites table only if the event is "current",
+ # i.e., its something that has just happened.
+ # The only current event that can also be an outlier is if its an
+ # invite that has come in across federation.
+ is_new_state = not backfilled and (
+ not event.internal_metadata.is_outlier()
+ or event.internal_metadata.is_invite_from_remote()
+ )
+ is_mine = self.hs.is_mine_id(event.state_key)
+ if is_new_state and is_mine:
+ if event.membership == Membership.INVITE:
+ self._simple_insert_txn(
+ txn,
+ table="local_invites",
+ values={
+ "event_id": event.event_id,
+ "invitee": event.state_key,
+ "inviter": event.sender,
+ "room_id": event.room_id,
+ "stream_id": event.internal_metadata.stream_ordering,
+ }
+ )
+ else:
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ txn.execute(sql, (
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.state_key,
+ ))
+
+ @defer.inlineCallbacks
+ def locally_reject_invite(self, user_id, room_id):
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ def f(txn, stream_ordering):
+ txn.execute(sql, (
+ stream_ordering,
+ True,
+ room_id,
+ user_id,
+ ))
+
+ with self._stream_id_gen.get_next() as stream_ordering:
+ yield self.runInteraction("locally_reject_invite", f, stream_ordering)
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -127,6 +185,24 @@ class RoomMemberStore(SQLBaseStore):
user_id, [Membership.INVITE]
)
+ @defer.inlineCallbacks
+ def get_invite_for_user_in_room(self, user_id, room_id):
+ """Gets the invite for the given user and room
+
+ Args:
+ user_id (str)
+ room_id (str)
+
+ Returns:
+ Deferred: Resolves to either a RoomsForUser or None if no invite was
+ found.
+ """
+ invites = yield self.get_invited_rooms_for_user(user_id)
+ for invite in invites:
+ if invite.room_id == room_id:
+ defer.returnValue(invite)
+ defer.returnValue(None)
+
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
Args:
@@ -163,29 +239,55 @@ class RoomMemberStore(SQLBaseStore):
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
- where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
- " OR ".join(["membership = ?" for _ in membership_list]),
- )
- args = [user_id]
- args.extend(membership_list)
+ do_invite = Membership.INVITE in membership_list
+ membership_list = [m for m in membership_list if m != Membership.INVITE]
- sql = (
- "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
- " FROM current_state_events as c"
- " INNER JOIN room_memberships as m"
- " ON m.event_id = c.event_id"
- " INNER JOIN events as e"
- " ON e.event_id = c.event_id"
- " AND m.room_id = c.room_id"
- " AND m.user_id = c.state_key"
- " WHERE %s"
- ) % (where_clause,)
+ results = []
+ if membership_list:
+ where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
+ " OR ".join(["membership = ?" for _ in membership_list]),
+ )
+
+ args = [user_id]
+ args.extend(membership_list)
+
+ sql = (
+ "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+ " FROM current_state_events as c"
+ " INNER JOIN room_memberships as m"
+ " ON m.event_id = c.event_id"
+ " INNER JOIN events as e"
+ " ON e.event_id = c.event_id"
+ " AND m.room_id = c.room_id"
+ " AND m.user_id = c.state_key"
+ " WHERE %s"
+ ) % (where_clause,)
+
+ txn.execute(sql, args)
+ results = [
+ RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+ ]
+
+ if do_invite:
+ sql = (
+ "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
+ " FROM local_invites as i"
+ " INNER JOIN events as e USING (event_id)"
+ " WHERE invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ txn.execute(sql, (user_id,))
+ results.extend(RoomsForUser(
+ room_id=r["room_id"],
+ sender=r["inviter"],
+ event_id=r["event_id"],
+ stream_ordering=r["stream_ordering"],
+ membership=Membership.INVITE,
+ ) for r in self.cursor_to_dict(txn))
- txn.execute(sql, args)
- return [
- RoomsForUser(**r) for r in self.cursor_to_dict(txn)
- ]
+ return results
@cached(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
index 5c40a77757..8755bb2e49 100644
--- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -18,7 +18,7 @@ import logging
logger = logging.getLogger(__name__)
-def run_upgrade(cur, *args, **kwargs):
+def run_create(cur, *args, **kwargs):
cur.execute("SELECT id, regex FROM application_services_regex")
for row in cur.fetchall():
try:
@@ -35,3 +35,7 @@ def run_upgrade(cur, *args, **kwargs):
"UPDATE application_services_regex SET regex=? WHERE id=?",
(new_regex, row[0])
)
+
+
+def run_upgrade(*args, **kwargs):
+ pass
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 29164732af..147496a38b 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -27,7 +27,7 @@ import logging
logger = logging.getLogger(__name__)
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table...")
cur.execute("""
CREATE TABLE IF NOT EXISTS pushers2 (
@@ -74,3 +74,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(*args, **kwargs):
+ pass
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index d3ff2b1779..4269ac69ad 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -43,7 +43,7 @@ SQLITE_TABLE = (
)
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
for statement in get_statements(POSTGRES_TABLE.splitlines()):
cur.execute(statement)
@@ -76,3 +76,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
sql = database_engine.convert_param_style(sql)
cur.execute(sql, ("event_search", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+ pass
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index f8c16391a2..71b12a2731 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -27,7 +27,7 @@ ALTER_TABLE = (
)
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)
@@ -55,3 +55,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
sql = database_engine.convert_param_style(sql)
cur.execute(sql, ("event_origin_server_ts", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+ pass
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 4f6e9dd540..b417e3ac08 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -18,7 +18,7 @@ from synapse.storage.appservice import ApplicationServiceStore
logger = logging.getLogger(__name__)
-def run_upgrade(cur, database_engine, config, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
# NULL indicates user was not registered by an appservice.
try:
cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
@@ -26,6 +26,8 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
# Maybe we already added the column? Hope so...
pass
+
+def run_upgrade(cur, database_engine, config, *args, **kwargs):
cur.execute("SELECT name FROM users")
rows = cur.fetchall()
diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql
new file mode 100644
index 0000000000..2c57846d5a
--- /dev/null
+++ b/synapse/storage/schema/delta/31/invites.sql
@@ -0,0 +1,42 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE local_invites(
+ stream_id BIGINT NOT NULL,
+ inviter TEXT NOT NULL,
+ invitee TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ locally_rejected TEXT,
+ replaced_by TEXT
+);
+
+-- Insert all invites for local users into new `invites` table
+INSERT INTO local_invites SELECT
+ stream_ordering as stream_id,
+ sender as inviter,
+ state_key as invitee,
+ event_id,
+ room_id,
+ NULL as locally_rejected,
+ NULL as replaced_by
+ FROM events
+ NATURAL JOIN current_state_events
+ NATURAL JOIN room_memberships
+ WHERE membership = 'invite' AND state_key IN (SELECT name FROM users);
+
+CREATE INDEX local_invites_id ON local_invites(stream_id);
+CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e9f9406014..c5d2a3a6df 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -273,8 +273,8 @@ class StateStore(SQLBaseStore):
desc="_get_state_group_for_event",
)
- @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
- num_args=1, inlineCallbacks=True)
+ @cachedList(cached_method_name="_get_state_group_for_event",
+ list_name="event_ids", num_args=1, inlineCallbacks=True)
def _get_state_group_for_events(self, event_ids):
"""Returns mapping event_id -> state_group
"""
|