summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-17 15:46:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-17 15:48:04 +0000
commitf8ee66250a16cb9dd3af01fb1150ff18cfebbc39 (patch)
tree9920bd4e8164f705b4e27c714d6c053082dcf7a5 /synapse/storage
parentHook up the send queue and create a federation sender worker (diff)
downloadsynapse-f8ee66250a16cb9dd3af01fb1150ff18cfebbc39.tar.xz
Handle sending events and device messages over federation
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/deviceinbox.py26
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/schema/delta/39/device_federation_stream_idx.sql16
-rw-r--r--synapse/storage/stream.py31
4 files changed, 62 insertions, 13 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..87398d60bc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
             return defer.succeed([])
 
         def get_all_new_device_messages_txn(txn):
+            # We limit like this as we might have multiple rows per stream_id, and
+            # we want to make sure we always get all entries for any stream_id
+            # we return.
+            upper_pos = min(current_pos, last_pos + limit)
             sql = (
-                "SELECT stream_id FROM device_inbox"
+                "SELECT stream_id, user_id"
+                " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " GROUP BY stream_id"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_pos, current_pos, limit))
-            stream_ids = txn.fetchall()
-            if not stream_ids:
-                return []
-            max_stream_id_in_limit = stream_ids[-1]
+            txn.execute(sql, (last_pos, upper_pos))
+            rows = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, device_id, message_json"
-                " FROM device_inbox"
+                "SELECT stream_id, destination"
+                " FROM device_federation_outbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
             )
-            txn.execute(sql, (last_pos, max_stream_id_in_limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_pos, upper_pos))
+            rows.extend(txn.fetchall())
+
+            return rows
 
         return self.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6576a30098..e46ae6502e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 38
+SCHEMA_VERSION = 39
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
new file mode 100644
index 0000000000..00be801e90
--- /dev/null
+++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35d..f34cb78f9a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -765,3 +765,34 @@ class StreamStore(SQLBaseStore):
                 "token": end_token,
             },
         }
+
+    @defer.inlineCallbacks
+    def get_all_new_events_stream(self, from_id, current_id, limit):
+        """Get all new events"""
+
+        def get_all_new_events_stream_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id"
+                " FROM events AS e"
+                " WHERE"
+                " ? < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (from_id, current_id, limit))
+            rows = txn.fetchall()
+
+            upper_bound = current_id
+            if len(rows) == limit:
+                upper_bound = rows[-1][0]
+
+            return upper_bound, [row[1] for row in rows]
+
+        upper_bound, event_ids = yield self.runInteraction(
+            "get_all_new_events_stream", get_all_new_events_stream_txn,
+        )
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue((upper_bound, events))