summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/devices.py2
-rw-r--r--synapse/storage/event_federation.py15
-rw-r--r--synapse/storage/event_push_actions.py4
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py92
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql3
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql7
-rw-r--r--synapse/storage/stream.py14
-rw-r--r--synapse/storage/transactions.py4
10 files changed, 125 insertions, 20 deletions
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 77ae10da3d..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 to_update,
             )
 
-        run_as_background_process(
+        return run_as_background_process(
             "update_client_ips", update,
         )
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 52dccb1507..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore):
 
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
-        run_as_background_process(
+        return run_as_background_process(
             "prune_old_outbound_device_pokes",
             self.runInteraction,
             "_prune_old_outbound_device_pokes",
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 14500ee59c..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -114,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         sql = (
             "SELECT b.event_id, MAX(e.depth) FROM events as e"
             " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+            " ON g.event_id = e.event_id"
             " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+            " ON g.prev_event_id = b.event_id"
             " WHERE b.room_id = ? AND g.is_state is ?"
             " GROUP BY b.event_id"
         )
@@ -330,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             "SELECT depth, prev_event_id FROM event_edges"
             " INNER JOIN events"
             " ON prev_event_id = events.event_id"
-            " AND event_edges.room_id = events.room_id"
-            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " WHERE event_edges.event_id = ?"
             " AND event_edges.is_state = ?"
             " LIMIT ?"
         )
@@ -365,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
             txn.execute(
                 query,
-                (room_id, event_id, False, limit - len(event_results))
+                (event_id, False, limit - len(event_results))
             )
 
             for row in txn:
@@ -402,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+            "WHERE event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -411,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, False, limit - len(event_results))
+                    (event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn:
@@ -549,7 +548,7 @@ class EventFederationStore(EventFederationWorkerStore):
                 sql,
                 (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
-        run_as_background_process(
+        return run_as_background_process(
             "delete_old_forward_extrem_cache",
             self.runInteraction,
             "_delete_old_forward_extrem_cache",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4f44b0ad47..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             )
 
     def _find_stream_orderings_for_times(self):
-        run_as_background_process(
+        return run_as_background_process(
             "event_push_action_stream_orderings",
             self.runInteraction,
             "_find_stream_orderings_for_times",
@@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         """, (room_id, user_id, stream_ordering))
 
     def _start_rotate_notifs(self):
-        run_as_background_process("rotate_notifs", self._rotate_notifs)
+        return run_as_background_process("rotate_notifs", self._rotate_notifs)
 
     @defer.inlineCallbacks
     def _rotate_notifs(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a32a306495..c98e524ba1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -524,7 +524,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
             iterable=list(new_latest_event_ids),
             retcols=["prev_event_id"],
             keyvalues={
-                "room_id": room_id,
                 "is_state": False,
             },
             desc="_calculate_new_extremeties",
@@ -1193,7 +1192,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     "type": event.type,
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
-                    "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        );
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+SQLite:
+
+    UPDATE events SET content=(
+        SELECT json_extract(json,'$.content') FROM event_json ej
+        WHERE ej.event_id = events.event_id
+    )
+    WHERE
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        )
+        OR stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute("""
+            ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+        """)
+        return
+
+    # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+    cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+    (oldsql,) = cur.fetchone()
+
+    sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+    if sql == oldsql:
+        raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+    logger.info("Replacing definition of 'events' with: %s", sql)
+
+    cur.execute("PRAGMA schema_version")
+    (oldver,) = cur.fetchone()
+    cur.execute("PRAGMA writable_schema=ON")
+    cur.execute(
+        "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+        (sql, ),
+    )
+    cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+    cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
index 52eec88357..6b5a5a88fa 100644
--- a/synapse/storage/schema/full_schemas/16/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT NOT NULL,
     prev_event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    is_state BOOL NOT NULL,
+    is_state BOOL NOT NULL,  -- true if this is a prev_state edge rather than a regular
+                             -- event dag edge.
     UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    content TEXT NOT NULL,
+
+    -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+    -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+    -- database, which breaks the sytests. Hence, we no longer make it nullable.
+    content TEXT,
+
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 9d85dbb1d6..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -527,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
 
     @defer.inlineCallbacks
-    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+    def get_events_around(
+        self, room_id, event_id, before_limit, after_limit, event_filter=None,
+    ):
         """Retrieve events and pagination tokens around a given event in a
         room.
 
@@ -536,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -543,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = yield self.runInteraction(
             "get_events_around", self._get_events_around_txn,
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter,
         )
 
         events_before = yield self._get_events(
@@ -563,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "end": results["after"]["token"],
         })
 
-    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+    def _get_events_around_txn(
+        self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+    ):
         """Retrieves event_ids and pagination tokens around a given event in a
         room.
 
@@ -572,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -601,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows, start_token = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
+            event_filter=event_filter,
         )
         events_before = [r.event_id for r in rows]
 
         rows, end_token = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
+            event_filter=event_filter,
         )
         events_after = [r.event_id for r in rows]
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b4b479d94c..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore):
         return self.cursor_to_dict(txn)
 
     def _start_cleanup_transactions(self):
-        run_as_background_process("cleanup_transactions", self._cleanup_transactions)
+        return run_as_background_process(
+            "cleanup_transactions", self._cleanup_transactions,
+        )
 
     def _cleanup_transactions(self):
         now = self._clock.time_msec()