summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-21 11:28:37 +0000
committerErik Johnston <erik@matrix.org>2016-11-21 11:33:08 +0000
commit7c9cdb22453d1a442e5c280149aeeff4d46da215 (patch)
tree1434dca32f57320810b5e70314db521f5cc7a338 /synapse/storage
parentHandle sending events and device messages over federation (diff)
downloadsynapse-7c9cdb22453d1a442e5c280149aeeff4d46da215.tar.xz
Store federation stream positions in the database
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py18
-rw-r--r--synapse/storage/schema/delta/39/federation_out_position.sql22
-rw-r--r--synapse/storage/stream.py16
3 files changed, 52 insertions, 4 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..d3686b9690 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+            "SELECT %(retcol)s FROM %(table)s %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+            "where": where,
         }
 
         txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
-        update_sql = "UPDATE %s SET %s WHERE %s" % (
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
+        update_sql = "UPDATE %s SET %s %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in updatevalues),
-            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+            where,
         )
 
         txn.execute(
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..edbd8e132f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+     type TEXT NOT NULL,
+     stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f34cb78f9a..7fa63b58a7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore):
         events = yield self._get_events(event_ids)
 
         defer.returnValue((upper_bound, events))
+
+    def get_federation_out_pos(self, typ):
+        return self._simple_select_one_onecol(
+            table="federation_stream_position",
+            retcol="stream_id",
+            keyvalues={"type": typ},
+            desc="get_federation_out_pos"
+        )
+
+    def update_federation_out_pos(self, typ, stream_id):
+        return self._simple_update_one(
+            table="federation_stream_position",
+            keyvalues={"type": typ},
+            updatevalues={"stream_id": stream_id},
+            desc="update_federation_out_pos",
+        )