summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-07-20 10:40:48 +0100
committerGitHub <noreply@github.com>2016-07-20 10:40:48 +0100
commit1e2a7f18a1d7858f37ec43bda7effc3f4d053997 (patch)
tree3363706b3d89bed1d08c51bfd4455351941be459 /synapse/storage
parentMerge pull request #931 from matrix-org/rav/refactor_register (diff)
parentComment (diff)
downloadsynapse-1e2a7f18a1d7858f37ec43bda7effc3f4d053997.tar.xz
Merge pull request #922 from matrix-org/erikj/file_api2
Feature: Add filter to /messages. Add 'contains_url' to filter.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/storage/schema/delta/33/event_fields.py60
-rw-r--r--synapse/storage/stream.py56
3 files changed, 197 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9d74fd159d..6610549281 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -152,6 +152,7 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
@@ -159,6 +160,10 @@ class EventsStore(SQLBaseStore):
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
+        self.register_background_update_handler(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+            self._background_reindex_fields_sender,
+        )
 
         self._event_persist_queue = _EventPeristenceQueue()
 
@@ -576,6 +581,11 @@ class EventsStore(SQLBaseStore):
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
+                    "sender": event.sender,
+                    "contains_url": (
+                        "url" in event.content
+                        and isinstance(event.content["url"], basestring)
+                    ),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -1116,6 +1126,78 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def _background_reindex_fields_sender(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id, json FROM events"
+                " INNER JOIN event_json USING (event_id)"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+
+            update_rows = []
+            for row in rows:
+                try:
+                    event_id = row[1]
+                    event_json = json.loads(row[2])
+                    sender = event_json["sender"]
+                    content = event_json["content"]
+
+                    contains_url = "url" in content
+                    if contains_url:
+                        contains_url &= isinstance(content["url"], basestring)
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                update_rows.append((sender, contains_url, event_id))
+
+            sql = (
+                "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+                clump = update_rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
     def _background_reindex_origin_server_ts(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py
new file mode 100644
index 0000000000..83066cccc9
--- /dev/null
+++ b/synapse/storage/schema/delta/33/event_fields.py
@@ -0,0 +1,60 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.prepare_database import get_statements
+
+import logging
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+ALTER_TABLE = """
+ALTER TABLE events ADD COLUMN sender TEXT;
+ALTER TABLE events ADD COLUMN contains_url BOOLEAN;
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    for statement in get_statements(ALTER_TABLE.splitlines()):
+        cur.execute(statement)
+
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
+
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
+
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+        }
+        progress_json = ujson.dumps(progress)
+
+        sql = (
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)"
+        )
+
+        sql = database_engine.convert_param_style(sql)
+
+        cur.execute(sql, ("event_fields_sender_url", progress_json))
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index c33ac5a8d7..862c5c3ea1 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -95,6 +95,54 @@ def upper_bound(token, engine, inclusive=True):
         )
 
 
+def filter_to_clause(event_filter):
+    # NB: This may create SQL clauses that don't optimise well (and we don't
+    # have indices on all possible clauses). E.g. it may create
+    # "room_id == X AND room_id != X", which postgres doesn't optimise.
+
+    if not event_filter:
+        return "", []
+
+    clauses = []
+    args = []
+
+    if event_filter.types:
+        clauses.append(
+            "(%s)" % " OR ".join("type = ?" for _ in event_filter.types)
+        )
+        args.extend(event_filter.types)
+
+    for typ in event_filter.not_types:
+        clauses.append("type != ?")
+        args.append(typ)
+
+    if event_filter.senders:
+        clauses.append(
+            "(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders)
+        )
+        args.extend(event_filter.senders)
+
+    for sender in event_filter.not_senders:
+        clauses.append("sender != ?")
+        args.append(sender)
+
+    if event_filter.rooms:
+        clauses.append(
+            "(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms)
+        )
+        args.extend(event_filter.rooms)
+
+    for room_id in event_filter.not_rooms:
+        clauses.append("room_id != ?")
+        args.append(room_id)
+
+    if event_filter.contains_url:
+        clauses.append("contains_url = ?")
+        args.append(event_filter.contains_url)
+
+    return " AND ".join(clauses), args
+
+
 class StreamStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -320,7 +368,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1):
+                             direction='b', limit=-1, event_filter=None):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -344,6 +392,12 @@ class StreamStore(SQLBaseStore):
                     RoomStreamToken.parse(to_key), self.database_engine
                 ))
 
+        filter_clause, filter_args = filter_to_clause(event_filter)
+
+        if filter_clause:
+            bounds += " AND " + filter_clause
+            args.extend(filter_args)
+
         if int(limit) > 0:
             args.append(int(limit))
             limit_str = " LIMIT ?"