diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e7443f2838..c46b653f11 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -42,6 +42,7 @@ from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
from .search import SearchStore
from .tags import TagsStore
+from .account_data import AccountDataStore
import logging
@@ -73,6 +74,7 @@ class DataStore(RoomMemberStore, RoomStore,
EndToEndKeyStore,
SearchStore,
TagsStore,
+ AccountDataStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
new file mode 100644
index 0000000000..d1829f84e8
--- /dev/null
+++ b/synapse/storage/account_data.py
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from twisted.internet import defer
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AccountDataStore(SQLBaseStore):
+
+ def get_account_data_for_user(self, user_id):
+ """Get all the client account_data for a user.
+
+ Args:
+ user_id(str): The user to get the account_data for.
+ Returns:
+ A deferred pair of a dict of global account_data and a dict
+ mapping from room_id string to per room account_data dicts.
+ """
+
+ def get_account_data_for_user_txn(txn):
+ rows = self._simple_select_list_txn(
+ txn, "account_data", {"user_id": user_id},
+ ["account_data_type", "content"]
+ )
+
+ global_account_data = {
+ row["account_data_type"]: json.loads(row["content"]) for row in rows
+ }
+
+ rows = self._simple_select_list_txn(
+ txn, "room_account_data", {"user_id": user_id},
+ ["room_id", "account_data_type", "content"]
+ )
+
+ by_room = {}
+ for row in rows:
+ room_data = by_room.setdefault(row["room_id"], {})
+ room_data[row["account_data_type"]] = json.loads(row["content"])
+
+ return (global_account_data, by_room)
+
+ return self.runInteraction(
+ "get_account_data_for_user", get_account_data_for_user_txn
+ )
+
+ def get_account_data_for_room(self, user_id, room_id):
+ """Get all the client account_data for a user for a room.
+
+ Args:
+ user_id(str): The user to get the account_data for.
+ room_id(str): The room to get the account_data for.
+ Returns:
+ A deferred dict of the room account_data
+ """
+ def get_account_data_for_room_txn(txn):
+ rows = self._simple_select_list_txn(
+ txn, "room_account_data", {"user_id": user_id, "room_id": room_id},
+ ["account_data_type", "content"]
+ )
+
+ return {
+ row["account_data_type"]: json.loads(row["content"]) for row in rows
+ }
+
+ return self.runInteraction(
+ "get_account_data_for_room", get_account_data_for_room_txn
+ )
+
+ def get_updated_account_data_for_user(self, user_id, stream_id):
+ """Get all the client account_data for a that's changed.
+
+ Args:
+ user_id(str): The user to get the account_data for.
+ stream_id(int): The point in the stream since which to get updates
+ Returns:
+ A deferred pair of a dict of global account_data and a dict
+ mapping from room_id string to per room account_data dicts.
+ """
+
+ def get_updated_account_data_for_user_txn(txn):
+ sql = (
+ "SELECT account_data_type, content FROM account_data"
+ " WHERE user_id = ? AND stream_id > ?"
+ )
+
+ txn.execute(sql, (user_id, stream_id))
+
+ global_account_data = {
+ row[0]: json.loads(row[1]) for row in txn.fetchall()
+ }
+
+ sql = (
+ "SELECT room_id, account_data_type, content FROM room_account_data"
+ " WHERE user_id = ? AND stream_id > ?"
+ )
+
+ txn.execute(sql, (user_id, stream_id))
+
+ account_data_by_room = {}
+ for row in txn.fetchall():
+ room_account_data = account_data_by_room.setdefault(row[0], {})
+ room_account_data[row[1]] = json.loads(row[2])
+
+ return (global_account_data, account_data_by_room)
+
+ return self.runInteraction(
+ "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
+ )
+
+ @defer.inlineCallbacks
+ def add_account_data_to_room(self, user_id, room_id, account_data_type, content):
+ """Add some account_data to a room for a user.
+ Args:
+ user_id(str): The user to add a tag for.
+ room_id(str): The room to add a tag for.
+ account_data_type(str): The type of account_data to add.
+ content(dict): A json object to associate with the tag.
+ Returns:
+ A deferred that completes once the account_data has been added.
+ """
+ content_json = json.dumps(content)
+
+ def add_account_data_txn(txn, next_id):
+ self._simple_upsert_txn(
+ txn,
+ table="room_account_data",
+ keyvalues={
+ "user_id": user_id,
+ "room_id": room_id,
+ "account_data_type": account_data_type,
+ },
+ values={
+ "stream_id": next_id,
+ "content": content_json,
+ }
+ )
+ self._update_max_stream_id(txn, next_id)
+
+ with (yield self._account_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction(
+ "add_room_account_data", add_account_data_txn, next_id
+ )
+
+ result = yield self._account_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def add_account_data_for_user(self, user_id, account_data_type, content):
+ """Add some account_data to a room for a user.
+ Args:
+ user_id(str): The user to add a tag for.
+ account_data_type(str): The type of account_data to add.
+ content(dict): A json object to associate with the tag.
+ Returns:
+ A deferred that completes once the account_data has been added.
+ """
+ content_json = json.dumps(content)
+
+ def add_account_data_txn(txn, next_id):
+ self._simple_upsert_txn(
+ txn,
+ table="account_data",
+ keyvalues={
+ "user_id": user_id,
+ "account_data_type": account_data_type,
+ },
+ values={
+ "stream_id": next_id,
+ "content": content_json,
+ }
+ )
+ self._update_max_stream_id(txn, next_id)
+
+ with (yield self._account_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction(
+ "add_user_account_data", add_account_data_txn, next_id
+ )
+
+ result = yield self._account_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ def _update_max_stream_id(self, txn, next_id):
+ """Update the max stream_id
+
+ Args:
+ txn: The database cursor
+ next_id(int): The the revision to advance to.
+ """
+ update_max_id_sql = (
+ "UPDATE account_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d35ca90b9..7088f2709b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore):
+ EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+
+ def __init__(self, hs):
+ super(EventsStore, self).__init__(hs)
+ self.register_background_update_handler(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+ )
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False,
is_new_state=True):
@@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore):
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
+ "origin_server_ts": int(event.origin_server_ts),
}
for event, _ in events_and_contexts
],
@@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore):
ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _background_reindex_origin_server_ts(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ events = self._get_events_txn(txn, event_ids)
+
+ rows = []
+ for event in events:
+ try:
+ event_id = event.event_id
+ origin_server_ts = event.origin_server_ts
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ rows.append((origin_server_ts, event_id))
+
+ sql = (
+ "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+ )
+
+ for index in range(0, len(rows), INSERT_CLUMP_SIZE):
+ clump = rows[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+ defer.returnValue(result)
diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql
index 3198a0d29c..48ad9cc6b8 100644
--- a/synapse/storage/schema/delta/26/account_data.sql
+++ b/synapse/storage/schema/delta/26/account_data.sql
@@ -15,3 +15,26 @@
ALTER TABLE private_user_data_max_stream_id RENAME TO account_data_max_stream_id;
+
+
+CREATE TABLE IF NOT EXISTS account_data(
+ user_id TEXT NOT NULL,
+ account_data_type TEXT NOT NULL, -- The type of the account_data.
+ stream_id BIGINT NOT NULL, -- The version of the account_data.
+ content TEXT NOT NULL, -- The JSON content of the account_data
+ CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type)
+);
+
+
+CREATE TABLE IF NOT EXISTS room_account_data(
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ account_data_type TEXT NOT NULL, -- The type of the account_data.
+ stream_id BIGINT NOT NULL, -- The version of the account_data.
+ content TEXT NOT NULL, -- The JSON content of the account_data
+ CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type)
+);
+
+
+CREATE INDEX account_data_stream_id on account_data(user_id, stream_id);
+CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id);
diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py
new file mode 100644
index 0000000000..8d4a981975
--- /dev/null
+++ b/synapse/storage/schema/delta/26/ts.py
@@ -0,0 +1,57 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+ALTER_TABLE = (
+ "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;"
+ "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ for statement in get_statements(ALTER_TABLE.splitlines()):
+ cur.execute(statement)
+
+ cur.execute("SELECT MIN(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ min_stream_id = rows[0][0]
+
+ cur.execute("SELECT MAX(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ max_stream_id = rows[0][0]
+
+ if min_stream_id is not None and max_stream_id is not None:
+ progress = {
+ "target_min_stream_id_inclusive": min_stream_id,
+ "max_stream_id_exclusive": max_stream_id + 1,
+ "rows_inserted": 0,
+ }
+ progress_json = ujson.dumps(progress)
+
+ sql = (
+ "INSERT into background_updates (update_name, progress_json)"
+ " VALUES (?, ?)"
+ )
+
+ sql = database_engine.convert_param_style(sql)
+
+ cur.execute(sql, ("event_origin_server_ts", progress_json))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6386642df..20a62d07ff 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -212,11 +212,11 @@ class SearchStore(BackgroundUpdateStore):
})
@defer.inlineCallbacks
- def search_room(self, room_id, search_term, keys, limit, pagination_token=None):
+ def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
"""Performs a full text search over events with given keys.
Args:
- room_id (str): The room_id to search in
+ room_id (list): The room_ids to search in
search_term (str): Search term to search for
keys (list): List of keys to search in, currently supports
"content.body", "content.name", "content.topic"
@@ -226,7 +226,15 @@ class SearchStore(BackgroundUpdateStore):
list of dicts
"""
clauses = []
- args = [search_term, room_id]
+ args = [search_term]
+
+ # Make sure we don't explode because the person is in too many rooms.
+ # We filter the results below regardless.
+ if len(room_ids) < 500:
+ clauses.append(
+ "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+ )
+ args.extend(room_ids)
local_clauses = []
for key in keys:
@@ -239,25 +247,25 @@ class SearchStore(BackgroundUpdateStore):
if pagination_token:
try:
- topo, stream = pagination_token.split(",")
- topo = int(topo)
+ origin_server_ts, stream = pagination_token.split(",")
+ origin_server_ts = int(origin_server_ts)
stream = int(stream)
except:
raise SynapseError(400, "Invalid pagination token")
clauses.append(
- "(topological_ordering < ?"
- " OR (topological_ordering = ? AND stream_ordering < ?))"
+ "(origin_server_ts < ?"
+ " OR (origin_server_ts = ? AND stream_ordering < ?))"
)
- args.extend([topo, topo, stream])
+ args.extend([origin_server_ts, origin_server_ts, stream])
if isinstance(self.database_engine, PostgresEngine):
sql = (
"SELECT ts_rank_cd(vector, query) as rank,"
- " topological_ordering, stream_ordering, room_id, event_id"
+ " origin_server_ts, stream_ordering, room_id, event_id"
" FROM plainto_tsquery('english', ?) as query, event_search"
" NATURAL JOIN events"
- " WHERE vector @@ query AND room_id = ?"
+ " WHERE vector @@ query AND "
)
elif isinstance(self.database_engine, Sqlite3Engine):
# We use CROSS JOIN here to ensure we use the right indexes.
@@ -270,24 +278,23 @@ class SearchStore(BackgroundUpdateStore):
# MATCH unless it uses the full text search index
sql = (
"SELECT rank(matchinfo) as rank, room_id, event_id,"
- " topological_ordering, stream_ordering"
+ " origin_server_ts, stream_ordering"
" FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
" FROM event_search"
" WHERE value MATCH ?"
" )"
" CROSS JOIN events USING (event_id)"
- " WHERE room_id = ?"
+ " WHERE "
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
- for clause in clauses:
- sql += " AND " + clause
+ sql += " AND ".join(clauses)
# We add an arbitrary limit here to ensure we don't try to pull the
# entire table from the database.
- sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+ sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
args.append(limit)
@@ -295,6 +302,8 @@ class SearchStore(BackgroundUpdateStore):
"search_rooms", self.cursor_to_dict, sql, *args
)
+ results = filter(lambda row: row["room_id"] in room_ids, results)
+
events = yield self._get_events([r["event_id"] for r in results])
event_map = {
@@ -312,7 +321,7 @@ class SearchStore(BackgroundUpdateStore):
"event": event_map[r["event_id"]],
"rank": r["rank"],
"pagination_token": "%s,%s" % (
- r["topological_ordering"], r["stream_ordering"]
+ r["origin_server_ts"], r["stream_ordering"]
),
}
for r in results
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index f6d826cc59..f520f60c6c 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -48,8 +48,8 @@ class TagsStore(SQLBaseStore):
Args:
user_id(str): The user to get the tags for.
Returns:
- A deferred dict mapping from room_id strings to lists of tag
- strings.
+ A deferred dict mapping from room_id strings to dicts mapping from
+ tag strings to tag content.
"""
deferred = self._simple_select_list(
|