diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8d33def6c6..8f812f0fd7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -329,13 +329,14 @@ class SQLBaseStore(object):
self.database_engine = hs.database_engine
- self._stream_id_gen = StreamIdGenerator()
+ self._stream_id_gen = StreamIdGenerator("events", "stream_ordering")
self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
self._pushers_id_gen = IdGenerator("pushers", "id", self)
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+ self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id")
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
new file mode 100644
index 0000000000..0168e74a0d
--- /dev/null
+++ b/synapse/storage/receipts.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore, cached
+
+from twisted.internet import defer
+
+
+class ReceiptStore(SQLBaseStore):
+
+ @cached
+ @defer.inlineCallbacks
+ def get_linearized_receipts_for_room(self, room_id):
+ rows = yield self._simple_select_list(
+ table="receipts_linearized",
+ keyvalues={"room_id": room_id},
+ retcols=["receipt_type", "user_id", "event_id"],
+ desc="get_linearized_receipts_for_room",
+ )
+
+ result = {}
+ for row in rows:
+ result.setdefault(
+ row["event_id"], {}
+ ).setdefault(
+ row["receipt_type"], []
+ ).append(row["user_id"])
+
+ defer.returnValue(result)
+
+ @cached
+ @defer.inlineCallbacks
+ def get_graph_receipts_for_room(self, room_id):
+ rows = yield self._simple_select_list(
+ table="receipts_graph",
+ keyvalues={"room_id": room_id},
+ retcols=["receipt_type", "user_id", "event_id"],
+ desc="get_linearized_receipts_for_room",
+ )
+
+ result = {}
+ for row in rows:
+ result.setdefault(
+ row["user_id"], {}
+ ).setdefault(
+ row["receipt_type"], []
+ ).append(row["event_id"])
+
+ defer.returnValue(result)
+
+ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
+ user_id, event_id, stream_id):
+ self._simple_delete_txn(
+ txn,
+ table="receipts_linearized",
+ keyvalues={
+ "stream_id": stream_id,
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ }
+ )
+
+ self._simple_insert_txn(
+ txn,
+ table="receipts_linearized",
+ values={
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ "event_id": event_id,
+ }
+ )
+
+ @defer.inlineCallbacks
+ def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
+ if not event_ids:
+ return
+
+ if len(event_ids) == 1:
+ linearized_event_id = event_ids[0]
+ else:
+ # we need to points in graph -> linearized form.
+ def graph_to_linear(txn):
+ query = (
+ "SELECT event_id WHERE room_id = ? AND stream_ordering IN ("
+ " SELECT max(stream_ordering) WHERE event_id IN (%s)"
+ ")"
+ ) % (",".join(["?"] * len(event_ids)))
+
+ txn.execute(query, [room_id] + event_ids)
+ rows = txn.fetchall()
+ if rows:
+ return rows[0][0]
+ else:
+ # TODO: ARGH?!
+ return None
+
+ linearized_event_id = yield self.runInteraction(
+ graph_to_linear, desc="insert_receipt_conv"
+ )
+
+ stream_id_manager = yield self._stream_id_gen.get_next(self)
+ with stream_id_manager() as stream_id:
+ yield self.runInteraction(
+ self.insert_linearized_receipt_txn,
+ room_id, receipt_type, user_id, linearized_event_id,
+ stream_id=stream_id,
+ desc="insert_linearized_receipt"
+ )
+
+ yield self.insert_graph_receipt(
+ room_id, receipt_type, user_id, event_ids
+ )
+
+ max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+ defer.returnValue((stream_id, max_persisted_id))
+
+ def insert_graph_receipt(self, room_id, receipt_type,
+ user_id, event_ids):
+ return self.runInteraction(
+ self.insert_graph_receipt_txn,
+ room_id, receipt_type, user_id, event_ids,
+ desc="insert_graph_receipt"
+ )
+
+ def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
+ user_id, event_ids):
+ self._simple_delete_txn(
+ txn,
+ table="receipts_graph",
+ keyvalues={
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ }
+ )
+ self._simple_insert_many_txn(
+ txn,
+ table="receipts_graph",
+ values=[
+ {
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ "event_id": event_id,
+ }
+ for event_id in event_ids
+ ],
+ )
diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql
index da9e18e903..ccd64ec7f4 100644
--- a/synapse/storage/schema/delta/21/receipts.sql
+++ b/synapse/storage/schema/delta/21/receipts.sql
@@ -1,16 +1,18 @@
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
CREATE TABLE IF NOT EXISTS receipts_graph(
room_id TEXT NOT NULL,
@@ -24,12 +26,13 @@ CREATE INDEX receipts_graph_room_tuple ON receipts_graph(
);
CREATE TABLE IF NOT EXISTS receipts_linearized (
+ stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL
);
-CREATE INDEX receipts_graph_room_tuple ON receipts_graph(
+CREATE INDEX receipts_linearized_room_tuple ON receipts_graph(
room_id, receipt_type, user_id
);
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 89d1643f10..b39006315d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -72,7 +72,10 @@ class StreamIdGenerator(object):
with stream_id_gen.get_next_txn(txn) as stream_id:
# ... persist event ...
"""
- def __init__(self):
+ def __init__(self, table, column):
+ self.table = table
+ self.column = column
+
self._lock = threading.Lock()
self._current_max = None
@@ -126,7 +129,7 @@ class StreamIdGenerator(object):
def _get_or_compute_current_max(self, txn):
with self._lock:
- txn.execute("SELECT MAX(stream_ordering) FROM events")
+ txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table))
rows = txn.fetchall()
val, = rows[0]
|