diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index a5a55742e0..b855811b98 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -71,7 +71,9 @@ class SynapseEvent(JsonEncodedObject):
"outlier",
"power_level",
"redacted",
- "prev_pdus",
+ "prev_events",
+ "hashes",
+ "signatures",
]
required_keys = [
diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py
index 991aae2a56..2cd591410b 100644
--- a/synapse/federation/pdu_codec.py
+++ b/synapse/federation/pdu_codec.py
@@ -47,7 +47,10 @@ class PduCodec(object):
kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin)
kwargs["room_id"] = pdu.context
kwargs["etype"] = pdu.pdu_type
- kwargs["prev_pdus"] = pdu.prev_pdus
+ kwargs["prev_events"] = [
+ encode_event_id(i, o)
+ for i, o in pdu.prev_pdus
+ ]
if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"):
kwargs["prev_state"] = encode_event_id(
@@ -78,8 +81,8 @@ class PduCodec(object):
d["context"] = event.room_id
d["pdu_type"] = event.type
- if hasattr(event, "prev_pdus"):
- d["prev_pdus"] = event.prev_pdus
+ if hasattr(event, "prev_events"):
+ d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events]
if hasattr(event, "prev_state"):
d["prev_state_id"], d["prev_state_origin"] = (
@@ -92,7 +95,7 @@ class PduCodec(object):
kwargs = copy.deepcopy(event.unrecognized_keys)
kwargs.update({
k: v for k, v in d.items()
- if k not in ["event_id", "room_id", "type"]
+ if k not in ["event_id", "room_id", "type", "prev_events"]
})
if "origin_server_ts" not in kwargs:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a50e19349a..678de0cf50 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,6 +40,7 @@ from .stream import StreamStore
from .pdu import StatePduStore, PduStore, PdusTable
from .transactions import TransactionStore
from .keys import KeyStore
+from .event_federation import EventFederationStore
from .state import StateStore
from .signatures import SignatureStore
@@ -69,6 +70,7 @@ SCHEMAS = [
"redactions",
"state",
"signatures",
+ "event_edges",
]
@@ -83,10 +85,12 @@ class _RollbackButIsFineException(Exception):
"""
pass
+
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
- DirectoryStore, KeyStore, StateStore, SignatureStore):
+ DirectoryStore, KeyStore, StateStore, SignatureStore,
+ EventFederationStore, ):
def __init__(self, hs):
super(DataStore, self).__init__(hs)
@@ -230,6 +234,10 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == RoomRedactionEvent.TYPE:
self._store_redaction(txn, event)
+ outlier = False
+ if hasattr(event, "outlier"):
+ outlier = event.outlier
+
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
@@ -237,20 +245,20 @@ class DataStore(RoomMemberStore, RoomStore,
"room_id": event.room_id,
"content": json.dumps(event.content),
"processed": True,
+ "outlier": outlier,
+ "depth": event.depth,
}
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
- if hasattr(event, "outlier"):
- vals["outlier"] = event.outlier
- else:
- vals["outlier"] = False
-
unrec = {
k: v
for k, v in event.get_full_dict().items()
- if k not in vals.keys() and k not in ["redacted", "redacted_because"]
+ if k not in vals.keys() and k not in [
+ "redacted", "redacted_because", "signatures", "hashes",
+ "prev_events",
+ ]
}
vals["unrecognized_keys"] = json.dumps(unrec)
@@ -264,6 +272,14 @@ class DataStore(RoomMemberStore, RoomStore,
)
raise _RollbackButIsFineException("_persist_event")
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
self._store_state_groups_txn(txn, event)
is_state = hasattr(event, "state_key") and event.state_key is not None
@@ -291,6 +307,28 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
+ signatures = event.signatures.get(event.origin, {})
+
+ for key_id, signature_base64 in signatures.items():
+ signature_bytes = decode_base64(signature_base64)
+ self._store_event_origin_signature_txn(
+ txn, event.event_id, key_id, signature_bytes,
+ )
+
+ for prev_event_id, prev_hashes in event.prev_events:
+ for alg, hash_base64 in prev_hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_prev_event_hash_txn(
+ txn, event.event_id, prev_event_id, alg, hash_bytes
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
+ self._store_pdu_reference_hash_txn(
+ txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes
+ )
+
+ self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+
def _store_redaction(self, txn, event):
txn.execute(
"INSERT OR IGNORE INTO redactions "
@@ -373,7 +411,7 @@ class DataStore(RoomMemberStore, RoomStore,
"""
def _snapshot(txn):
membership_state = self._get_room_member(txn, user_id, room_id)
- prev_pdus = self._get_latest_pdus_in_context(
+ prev_events = self._get_latest_events_in_room(
txn, room_id
)
@@ -388,7 +426,7 @@ class DataStore(RoomMemberStore, RoomStore,
store=self,
room_id=room_id,
user_id=user_id,
- prev_pdus=prev_pdus,
+ prev_events=prev_events,
membership_state=membership_state,
state_type=state_type,
state_key=state_key,
@@ -404,7 +442,7 @@ class Snapshot(object):
store (DataStore): The datastore.
room_id (RoomId): The room of the snapshot.
user_id (UserId): The user this snapshot is for.
- prev_pdus (list): The list of PDU ids this snapshot is after.
+ prev_events (list): The list of event ids this snapshot is after.
membership_state (RoomMemberEvent): The current state of the user in
the room.
state_type (str, optional): State type captured by the snapshot
@@ -413,29 +451,29 @@ class Snapshot(object):
the previous value of the state type and key in the room.
"""
- def __init__(self, store, room_id, user_id, prev_pdus,
+ def __init__(self, store, room_id, user_id, prev_events,
membership_state, state_type=None, state_key=None,
prev_state_pdu=None):
self.store = store
self.room_id = room_id
self.user_id = user_id
- self.prev_pdus = prev_pdus
+ self.prev_events = prev_events
self.membership_state = membership_state
self.state_type = state_type
self.state_key = state_key
self.prev_state_pdu = prev_state_pdu
def fill_out_prev_events(self, event):
- if hasattr(event, "prev_pdus"):
+ if hasattr(event, "prev_events"):
return
- event.prev_pdus = [
+ event.prev_events = [
(p_id, origin, hashes)
- for p_id, origin, hashes, _ in self.prev_pdus
+ for p_id, origin, hashes, _ in self.prev_events
]
- if self.prev_pdus:
- event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1
+ if self.prev_events:
+ event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1
else:
event.depth = 0
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1192216971..30732caa83 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -193,7 +193,6 @@ class SQLBaseStore(object):
table, keyvalues, retcols=retcols, allow_none=allow_none
)
- @defer.inlineCallbacks
def _simple_select_one_onecol(self, table, keyvalues, retcol,
allow_none=False):
"""Executes a SELECT query on the named table, which is expected to
@@ -204,19 +203,41 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the row with
retcol : string giving the name of the column to return
"""
- ret = yield self._simple_select_one(
+ return self.runInteraction(
+ "_simple_select_one_onecol_txn",
+ self._simple_select_one_onecol_txn,
+ table, keyvalues, retcol, allow_none=allow_none,
+ )
+
+ def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol,
+ allow_none=False):
+ ret = self._simple_select_onecol_txn(
+ txn,
table=table,
keyvalues=keyvalues,
- retcols=[retcol],
- allow_none=allow_none
+ retcols=retcol,
)
if ret:
- defer.returnValue(ret[retcol])
+ return ret[retcol]
else:
- defer.returnValue(None)
+ if allow_none:
+ return None
+ else:
+ raise StoreError(404, "No row found")
+
+ def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
+ sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % {
+ "retcol": retcol,
+ "table": table,
+ "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+ }
+
+ txn.execute(sql, keyvalues.values())
+
+ return [r[0] for r in txn.fetchall()]
+
- @defer.inlineCallbacks
def _simple_select_onecol(self, table, keyvalues, retcol):
"""Executes a SELECT query on the named table, which returns a list
comprising of the values of the named column from the selected rows.
@@ -229,19 +250,11 @@ class SQLBaseStore(object):
Returns:
Deferred: Results in a list
"""
- sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % {
- "retcol": retcol,
- "table": table,
- "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
- }
-
- def func(txn):
- txn.execute(sql, keyvalues.values())
- return txn.fetchall()
-
- res = yield self.runInteraction("_simple_select_onecol", func)
-
- defer.returnValue([r[0] for r in res])
+ return self.runInteraction(
+ "_simple_select_onecol",
+ self._simple_select_onecol_txn,
+ table, keyvalues, retcol
+ )
def _simple_select_list(self, table, keyvalues, retcols):
"""Executes a SELECT query on the named table, which may return zero or
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
new file mode 100644
index 0000000000..27ad9aea4d
--- /dev/null
+++ b/synapse/storage/event_federation.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from twisted.internet import defer
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class EventFederationStore(SQLBaseStore):
+
+ def _get_latest_events_in_room(self, txn, room_id):
+ self._simple_select_onecol_txn(
+ txn,
+ table="event_forward_extremities",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="event_id",
+ )
+
+ results = []
+ for pdu_id, origin, depth in txn.fetchall():
+ hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin)
+ sha256_bytes = hashes["sha256"]
+ prev_hashes = {"sha256": encode_base64(sha256_bytes)}
+ results.append((pdu_id, origin, prev_hashes, depth))
+
+ def _get_min_depth_interaction(self, txn, room_id):
+ min_depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="room_depth",
+ keyvalues={"room_id": room_id,},
+ retcol="min_depth",
+ allow_none=True,
+ )
+
+ return int(min_depth) if min_depth is not None else None
+
+ def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+ min_depth = self._get_min_depth_interaction(txn, room_id)
+
+ do_insert = depth < min_depth if min_depth else True
+
+ if do_insert:
+ self._simple_insert_txn(
+ txn,
+ table="room_depth",
+ values={
+ "room_id": room_id,
+ "min_depth": depth,
+ },
+ or_replace=True,
+ )
+
+ def _handle_prev_events(self, txn, outlier, event_id, prev_events,
+ room_id):
+ for e_id in prev_events:
+ # TODO (erikj): This could be done as a bulk insert
+ self._simple_insert_txn(
+ txn,
+ table="event_edges",
+ values={
+ "event_id": event_id,
+ "prev_event": e_id,
+ "room_id": room_id,
+ }
+ )
+
+ # Update the extremities table if this is not an outlier.
+ if not outlier:
+ for e_id in prev_events:
+ # TODO (erikj): This could be done as a bulk insert
+ self._simple_delete_txn(
+ txn,
+ table="event_forward_extremities",
+ keyvalues={
+ "event_id": e_id,
+ "room_id": room_id,
+ }
+ )
+
+
+
+ # We only insert as a forward extremity the new pdu if there are no
+ # other pdus that reference it as a prev pdu
+ query = (
+ "INSERT INTO %(table)s (event_id, room_id) "
+ "SELECT ?, ? WHERE NOT EXISTS ("
+ "SELECT 1 FROM %(event_edges)s WHERE "
+ "prev_event_id = ? "
+ ")"
+ ) % {
+ "table": "event_forward_extremities",
+ "event_edges": "event_edges",
+ }
+
+ logger.debug("query: %s", query)
+
+ txn.execute(query, (event_id, room_id, event_id))
+
+ # Insert all the prev_pdus as a backwards thing, they'll get
+ # deleted in a second if they're incorrect anyway.
+ for e_id in prev_events:
+ # TODO (erikj): This could be done as a bulk insert
+ self._simple_insert_txn(
+ txn,
+ table="event_backward_extremities",
+ values={
+ "event_id": e_id,
+ "room_id": room_id,
+ }
+ )
+
+ # Also delete from the backwards extremities table all ones that
+ # reference pdus that we have already seen
+ query = (
+ "DELETE FROM %(event_back)s as b WHERE EXISTS ("
+ "SELECT 1 FROM %(events)s AS events "
+ "WHERE "
+ "b.event_id = events.event_id "
+ "AND not events.outlier "
+ ")"
+ ) % {
+ "event_back": "event_backward_extremities",
+ "events": "events",
+ }
+ txn.execute(query)
\ No newline at end of file
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql
new file mode 100644
index 0000000000..6a28314ece
--- /dev/null
+++ b/synapse/storage/schema/event_edges.sql
@@ -0,0 +1,51 @@
+
+CREATE TABLE IF NOT EXISTS event_forward_extremities(
+ event_id TEXT,
+ room_id TEXT,
+ CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+);
+
+CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
+--
+
+CREATE TABLE IF NOT EXISTS event_backward_extremities(
+ event_id TEXT,
+ room_id TEXT,
+ CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+);
+
+CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
+--
+
+CREATE TABLE IF NOT EXISTS event_edges(
+ event_id TEXT,
+ prev_event_id TEXT,
+ room_id TEXT,
+ CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id)
+);
+
+CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
+CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
+--
+
+
+CREATE TABLE IF NOT EXISTS room_depth(
+ room_id TEXT,
+ min_depth INTEGER,
+ CONSTRAINT uniqueness UNIQUE (room_id)
+);
+
+CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
+--
+
+create TABLE IF NOT EXISTS event_destinations(
+ event_id TEXT,
+ destination TEXT,
+ delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
+ CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
+);
+
+CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
+--
\ No newline at end of file
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
new file mode 100644
index 0000000000..5491c7ecec
--- /dev/null
+++ b/synapse/storage/schema/event_signatures.sql
@@ -0,0 +1,65 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_content_hashes (
+ event_id TEXT,
+ algorithm TEXT,
+ hash BLOB,
+ CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
+ event_id
+);
+
+
+CREATE TABLE IF NOT EXISTS event_reference_hashes (
+ event_id TEXT,
+ algorithm TEXT,
+ hash BLOB,
+ CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
+ event_id
+);
+
+
+CREATE TABLE IF NOT EXISTS event_origin_signatures (
+ event_id TEXT,
+ origin TEXT,
+ key_id TEXT,
+ signature BLOB,
+ CONSTRAINT uniqueness UNIQUE (event_id, key_id)
+);
+
+CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures (
+ event_id
+);
+
+
+CREATE TABLE IF NOT EXISTS event_edge_hashes(
+ event_id TEXT,
+ prev_event_id TEXT,
+ algorithm TEXT,
+ hash BLOB,
+ CONSTRAINT uniqueness UNIQUE (
+ event_id, prev_event_id, algorithm
+ )
+);
+
+CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
+ event_id
+);
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 3aa83f5c8c..8d6f655993 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS events(
unrecognized_keys TEXT,
processed BOOL NOT NULL,
outlier BOOL NOT NULL,
+ depth INTEGER DEFAULT 0 NOT NULL,
CONSTRAINT ev_uniq UNIQUE (event_id)
);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 82be946d3f..b8f8fd44cb 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -153,3 +153,130 @@ class SignatureStore(SQLBaseStore):
"algorithm": algorithm,
"hash": buffer(hash_bytes),
})
+
+ ## Events ##
+
+ def _get_event_content_hashes_txn(self, txn, event_id):
+ """Get all the hashes for a given Event.
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ Returns:
+ A dict of algorithm -> hash.
+ """
+ query = (
+ "SELECT algorithm, hash"
+ " FROM event_content_hashes"
+ " WHERE event_id = ?"
+ )
+ txn.execute(query, (event_id, ))
+ return dict(txn.fetchall())
+
+ def _store_event_content_hash_txn(self, txn, event_id, algorithm,
+ hash_bytes):
+ """Store a hash for a Event
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ algorithm (str): Hashing algorithm.
+ hash_bytes (bytes): Hash function output bytes.
+ """
+ self._simple_insert_txn(txn, "event_content_hashes", {
+ "event_id": event_id,
+ "algorithm": algorithm,
+ "hash": buffer(hash_bytes),
+ })
+
+ def _get_event_reference_hashes_txn(self, txn, event_id):
+ """Get all the hashes for a given PDU.
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ Returns:
+ A dict of algorithm -> hash.
+ """
+ query = (
+ "SELECT algorithm, hash"
+ " FROM event_reference_hashes"
+ " WHERE event_id = ?"
+ )
+ txn.execute(query, (event_id, ))
+ return dict(txn.fetchall())
+
+ def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
+ hash_bytes):
+ """Store a hash for a PDU
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ algorithm (str): Hashing algorithm.
+ hash_bytes (bytes): Hash function output bytes.
+ """
+ self._simple_insert_txn(txn, "event_reference_hashes", {
+ "event_id": event_id,
+ "algorithm": algorithm,
+ "hash": buffer(hash_bytes),
+ })
+
+
+ def _get_event_origin_signatures_txn(self, txn, event_id):
+ """Get all the signatures for a given PDU.
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ Returns:
+ A dict of key_id -> signature_bytes.
+ """
+ query = (
+ "SELECT key_id, signature"
+ " FROM event_origin_signatures"
+ " WHERE event_id = ? "
+ )
+ txn.execute(query, (event_id, ))
+ return dict(txn.fetchall())
+
+ def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id,
+ signature_bytes):
+ """Store a signature from the origin server for a PDU.
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ origin (str): origin of the Event.
+ key_id (str): Id for the signing key.
+ signature (bytes): The signature.
+ """
+ self._simple_insert_txn(txn, "event_origin_signatures", {
+ "event_id": event_id,
+ "origin": origin,
+ "key_id": key_id,
+ "signature": buffer(signature_bytes),
+ })
+
+ def _get_prev_event_hashes_txn(self, txn, event_id):
+ """Get all the hashes for previous PDUs of a PDU
+ Args:
+ txn (cursor):
+ event_id (str): Id for the Event.
+ Returns:
+ dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes.
+ """
+ query = (
+ "SELECT prev_event_id, algorithm, hash"
+ " FROM event_edge_hashes"
+ " WHERE event_id = ?"
+ )
+ txn.execute(query, (event_id, ))
+ results = {}
+ for prev_event_id, algorithm, hash_bytes in txn.fetchall():
+ hashes = results.setdefault(prev_event_id, {})
+ hashes[algorithm] = hash_bytes
+ return results
+
+ def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
+ algorithm, hash_bytes):
+ self._simple_insert_txn(txn, "event_edge_hashes", {
+ "event_id": event_id,
+ "prev_event_id": prev_event_id,
+ "algorithm": algorithm,
+ "hash": buffer(hash_bytes),
+ })
\ No newline at end of file
|