From 3dac27a8a9846a892284971f71e05c2440225484 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 14 Oct 2014 14:54:26 +0100 Subject: Storage for pdu signatures --- synapse/storage/signatures.py | 90 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 synapse/storage/signatures.py (limited to 'synapse/storage/signatures.py') diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py new file mode 100644 index 0000000000..bb860f09f0 --- /dev/null +++ b/synapse/storage/signatures.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore + +from twisted.internet import defer + + +class SignatureStore(SQLBaseStore): + """Persistence for PDU signatures and hashes""" + + def _get_pdu_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM pdu_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + return dict(txn.fetchall()) + + def _store_pdu_hash_txn(self, txn, pdu_id, origin, algorithm, hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(self, txn, "pdu_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_pdu_origin_signatures_txn(self, txn, pdu_id, origin): + """Get all the signatures for a given PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + Returns: + A dict of key_id -> signature_bytes. + """ + query = ( + "SELECT key_id, signature" + " FROM pdu_origin_signatures" + " WHERE WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + return dict(txn.fetchall()) + + def _store_pdu_origin_signature_txn(self, txn, pdu_id, origin, key_id, + signature_bytes): + """Store a signature from the origin server for a PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + key_id (str): Id for the signing key. + signature (bytes): The signature. + """ + self._simple_insert_txn(self, txn, "pdu_origin_signatures", { + "pdu_id": pdu_id, + "origin": origin, + "key_id": key_id, + "signature": buffer(signature_bytes), + }) + -- cgit 1.4.1 From 66104da10c4191aa1e048f2379190574755109e6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 00:09:48 +0100 Subject: Sign outgoing PDUs. --- synapse/crypto/event_signing.py | 4 ++-- synapse/federation/pdu_codec.py | 6 +++++- synapse/storage/__init__.py | 7 ++++--- synapse/storage/signatures.py | 6 +++--- tests/federation/test_pdu_codec.py | 13 ++++++++++--- tests/rest/test_events.py | 7 +++++-- tests/rest/test_profile.py | 8 ++++++-- tests/rest/test_rooms.py | 32 +++++++++++++++++++++++++------- tests/utils.py | 3 ++- 9 files changed, 62 insertions(+), 24 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 6557727e06..a115967c0a 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -15,6 +15,7 @@ # limitations under the License. +from synapse.federation.units import Pdu from synapse.api.events.utils import prune_pdu from syutil.jsonutil import encode_canonical_json from syutil.base64util import encode_base64, decode_base64 @@ -25,8 +26,7 @@ import hashlib def hash_event_pdu(pdu, hash_algortithm=hashlib.sha256): hashed = _compute_hash(pdu, hash_algortithm) - hashes[hashed.name] = encode_base64(hashed.digest()) - pdu.hashes = hashes + pdu.hashes[hashed.name] = encode_base64(hashed.digest()) return pdu diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index cef61108dd..bcac5f9ae8 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,6 +14,7 @@ # limitations under the License. from .units import Pdu +from synapse.crypto.event_signing import hash_event_pdu, sign_event_pdu import copy @@ -33,6 +34,7 @@ def encode_event_id(pdu_id, origin): class PduCodec(object): def __init__(self, hs): + self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_factory = hs.get_event_factory() self.clock = hs.get_clock() @@ -99,4 +101,6 @@ class PduCodec(object): if "ts" not in kwargs: kwargs["ts"] = int(self.clock.time_msec()) - return Pdu(**kwargs) + pdu = Pdu(**kwargs) + pdu = hash_event_pdu(pdu) + return sign_event_pdu(pdu, self.server_name, self.signing_key) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index bfeab7d1e8..b2a3f0b56c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .transactions import TransactionStore from .keys import KeyStore from .signatures import SignatureStore +from syutil.base64util import decode_base64 import json import logging @@ -168,11 +169,11 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, ) - signatures = pdu.sigatures.get(pdu.orgin, {}) + signatures = pdu.signatures.get(pdu.origin, {}) - for key_id, signature_base64 in signatures: + for key_id, signature_base64 in signatures.items(): signature_bytes = decode_base64(signature_base64) - self.store_pdu_origin_signatures_txn( + self._store_pdu_origin_signature_txn( txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index bb860f09f0..1f0a680500 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -47,7 +47,7 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(self, txn, "pdu_hashes", { + self._simple_insert_txn(txn, "pdu_hashes", { "pdu_id": pdu_id, "origin": origin, "algorithm": algorithm, @@ -66,7 +66,7 @@ class SignatureStore(SQLBaseStore): query = ( "SELECT key_id, signature" " FROM pdu_origin_signatures" - " WHERE WHERE pdu_id = ? and origin = ?" + " WHERE pdu_id = ? and origin = ?" ) txn.execute(query, (pdu_id, origin)) return dict(txn.fetchall()) @@ -81,7 +81,7 @@ class SignatureStore(SQLBaseStore): key_id (str): Id for the signing key. signature (bytes): The signature. """ - self._simple_insert_txn(self, txn, "pdu_origin_signatures", { + self._simple_insert_txn(txn, "pdu_origin_signatures", { "pdu_id": pdu_id, "origin": origin, "key_id": key_id, diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 344e1baf60..80851a4258 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -23,14 +23,21 @@ from synapse.federation.units import Pdu from synapse.server import HomeServer -from mock import Mock +from mock import Mock, NonCallableMock + +from ..utils import MockKey class PduCodecTestCase(unittest.TestCase): def setUp(self): - self.hs = HomeServer("blargle.net") - self.event_factory = self.hs.get_event_factory() + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + self.hs = HomeServer( + "blargle.net", + config=self.mock_config, + ) + self.event_factory = self.hs.get_event_factory() self.codec = PduCodec(self.hs) def test_decode_event_id(self): diff --git a/tests/rest/test_events.py b/tests/rest/test_events.py index 79b371c04d..362c7bc01c 100644 --- a/tests/rest/test_events.py +++ b/tests/rest/test_events.py @@ -28,7 +28,7 @@ from synapse.server import HomeServer # python imports import json -from ..utils import MockHttpResource, MemoryDataStore +from ..utils import MockHttpResource, MemoryDataStore, MockKey from .utils import RestTestCase from mock import Mock, NonCallableMock @@ -122,6 +122,9 @@ class EventStreamPermissionsTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "test", db_pool=None, @@ -139,7 +142,7 @@ class EventStreamPermissionsTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) diff --git a/tests/rest/test_profile.py b/tests/rest/test_profile.py index b0f48e7fd8..3a0d1e700a 100644 --- a/tests/rest/test_profile.py +++ b/tests/rest/test_profile.py @@ -18,9 +18,9 @@ from tests import unittest from twisted.internet import defer -from mock import Mock +from mock import Mock, NonCallableMock -from ..utils import MockHttpResource +from ..utils import MockHttpResource, MockKey from synapse.api.errors import SynapseError, AuthError from synapse.server import HomeServer @@ -41,6 +41,9 @@ class ProfileTestCase(unittest.TestCase): "set_avatar_url", ]) + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer("test", db_pool=None, http_client=None, @@ -48,6 +51,7 @@ class ProfileTestCase(unittest.TestCase): federation=Mock(), replication_layer=Mock(), datastore=None, + config=self.mock_config, ) def _get_user_by_req(request=None): diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index 1ce9b8a83d..7170193051 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -27,7 +27,7 @@ from synapse.server import HomeServer import json import urllib -from ..utils import MockHttpResource, MemoryDataStore +from ..utils import MockHttpResource, MemoryDataStore, MockKey from .utils import RestTestCase from mock import Mock, NonCallableMock @@ -50,6 +50,9 @@ class RoomPermissionsTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -61,7 +64,7 @@ class RoomPermissionsTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -408,6 +411,9 @@ class RoomsMemberListTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -419,7 +425,7 @@ class RoomsMemberListTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -497,6 +503,9 @@ class RoomsCreateTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -508,7 +517,7 @@ class RoomsCreateTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -598,6 +607,9 @@ class RoomTopicTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -609,7 +621,7 @@ class RoomTopicTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -712,6 +724,9 @@ class RoomMemberStateTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -723,7 +738,7 @@ class RoomMemberStateTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) @@ -853,6 +868,9 @@ class RoomMessagesTestCase(RestTestCase): persistence_service = Mock(spec=["get_latest_pdus_in_context"]) persistence_service.get_latest_pdus_in_context.return_value = [] + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + hs = HomeServer( "red", db_pool=None, @@ -864,7 +882,7 @@ class RoomMessagesTestCase(RestTestCase): ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), - config=NonCallableMock(), + config=self.mock_config, ) self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) diff --git a/tests/utils.py b/tests/utils.py index 60fd6085ac..d8be73dba8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -118,13 +118,14 @@ class MockHttpResource(HttpServer): class MockKey(object): alg = "mock_alg" version = "mock_version" + signature = b"\x9a\x87$" @property def verify_key(self): return self def sign(self, message): - return b"\x9a\x87$" + return self def verify(self, message, sig): assert sig == b"\x9a\x87$" -- cgit 1.4.1 From bb04447c44036ebf3ae5dde7a4cc7a7909d50ef6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 23:25:12 +0100 Subject: Include hashes of previous pdus when referencing them --- synapse/api/events/__init__.py | 2 +- synapse/federation/pdu_codec.py | 13 ++++--------- synapse/federation/replication.py | 2 +- synapse/federation/units.py | 10 +++++++++- synapse/state.py | 4 ---- synapse/storage/__init__.py | 20 ++++++++++++++------ synapse/storage/pdu.py | 22 ++++++++++++++++------ synapse/storage/schema/signatures.sql | 16 ++++++++++++++++ synapse/storage/signatures.py | 31 +++++++++++++++++++++++++++++++ tests/federation/test_federation.py | 2 +- tests/federation/test_pdu_codec.py | 4 ++-- 11 files changed, 95 insertions(+), 31 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f66fea2904..a5a55742e0 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -65,13 +65,13 @@ class SynapseEvent(JsonEncodedObject): internal_keys = [ "is_state", - "prev_events", "depth", "destinations", "origin", "outlier", "power_level", "redacted", + "prev_pdus", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index bcac5f9ae8..11fd7264b3 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -45,9 +45,7 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - encode_event_id(p[0], p[1]) for p in pdu.prev_pdus - ] + kwargs["prev_pdus"] = pdu.prev_pdus if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,11 +76,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_events"): - d["prev_pdus"] = [ - decode_event_id(e, self.server_name) - for e in event.prev_events - ] + if hasattr(event, "prev_pdus"): + d["prev_pdus"] = event.prev_pdus if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -95,7 +90,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] + if k not in ["event_id", "room_id", "type"] }) if "ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9363ac7300..788a49b8e8 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -443,7 +443,7 @@ class ReplicationLayer(object): min_depth = yield self.store.get_min_depth_for_context(pdu.context) if min_depth and pdu.depth > min_depth: - for pdu_id, origin in pdu.prev_pdus: + for pdu_id, origin, hashes in pdu.prev_pdus: exists = yield self._get_persisted_pdu(pdu_id, origin) if not exists: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 3518efb215..6a43007837 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -141,8 +141,16 @@ class Pdu(JsonEncodedObject): for kid, sig in pdu_tuple.signatures.items() } + prev_pdus = [] + for prev_pdu in pdu_tuple.prev_pdu_list: + prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) + prev_hashes = { + alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() + } + prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) + return Pdu( - prev_pdus=pdu_tuple.prev_pdu_list, + prev_pdus=prev_pdus, **args ) else: diff --git a/synapse/state.py b/synapse/state.py index 9db84c9b5c..bc6b928ec7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -72,10 +72,6 @@ class StateHandler(object): snapshot.fill_out_prev_events(event) - event.prev_events = [ - e for e in event.prev_events if e != event.event_id - ] - current_state = snapshot.prev_state_pdu if current_state: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b2a3f0b56c..af05b47932 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -177,6 +177,14 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) + for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_pdu_hash_txn( + txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, + hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: @@ -352,6 +360,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_pdus = self._get_latest_pdus_in_context( txn, room_id ) + if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( txn, room_id, state_type, state_key @@ -401,17 +410,16 @@ class Snapshot(object): self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): + if hasattr(event, "prev_pdus"): return - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus + event.prev_pdus = [ + (p_id, origin, hashes) + for p_id, origin, hashes, _ in self.prev_pdus ] - event.prev_events = [e for e in es if e != event.event_id] - if self.prev_pdus: - event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1 + event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 else: event.depth = 0 diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 9d624429b7..a423b42dbd 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -20,10 +20,13 @@ from ._base import SQLBaseStore, Table, JoinHelper from synapse.federation.units import Pdu from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 + from collections import namedtuple import logging + logger = logging.getLogger(__name__) @@ -64,6 +67,8 @@ class PduStore(SQLBaseStore): for r in PduEdgesTable.decode_results(txn.fetchall()) ] + edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin @@ -86,7 +91,7 @@ class PduStore(SQLBaseStore): row = txn.fetchone() if row: results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures + PduEntry(*row), edges, hashes, signatures, edge_hashes )) return results @@ -310,9 +315,14 @@ class PduStore(SQLBaseStore): (context, ) ) - results = txn.fetchall() + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) - return [(row[0], row[1], row[2]) for row in results] + return results @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): @@ -431,7 +441,7 @@ class PduStore(SQLBaseStore): "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" % PduForwardExtremitiesTable.table_name ) - txn.executemany(query, prev_pdus) + txn.executemany(query, list(p[:2] for p in prev_pdus)) # We only insert as a forward extremety the new pdu if there are no # other pdus that reference it as a prev pdu @@ -454,7 +464,7 @@ class PduStore(SQLBaseStore): # deleted in a second if they're incorrect anyway. txn.executemany( PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o in prev_pdus] + [(i, o, context) for i, o, _ in prev_pdus] ) # Also delete from the backwards extremities table all ones that @@ -915,7 +925,7 @@ This does not include a prev_pdus key. PduTuple = namedtuple( "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures") + ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") ) """ This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent the `prev_pdus` key of a PDU. diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index 86ee0f2377..a72c4dc35f 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -34,3 +34,19 @@ CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( CREATE INDEX IF NOT EXISTS pdu_origin_signatures_id ON pdu_origin_signatures ( pdu_id, origin ); + +CREATE TABLE IF NOT EXISTS pdu_edge_hashes( + pdu_id TEXT, + origin TEXT, + prev_pdu_id TEXT, + prev_origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + pdu_id, origin, prev_pdu_id, prev_origin, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS pdu_edge_hashes_id ON pdu_edge_hashes( + pdu_id, origin +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1f0a680500..1147102489 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -88,3 +88,34 @@ class SignatureStore(SQLBaseStore): "signature": buffer(signature_bytes), }) + def _get_prev_pdu_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + pdu_id (str): Id of the PDU. + origin (str): Origin of the PDU. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_pdu_id, prev_origin, algorithm, hash" + " FROM pdu_edge_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + results = {} + for prev_pdu_id, prev_origin, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault((prev_pdu_id, prev_origin), {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_pdu_hash_txn(self, txn, pdu_id, origin, prev_pdu_id, + prev_origin, algorithm, hash_bytes): + self._simple_insert_txn(txn, "pdu_edge_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "prev_pdu_id": prev_pdu_id, + "prev_origin": prev_origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 03b2167cf7..eed50e6335 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -41,7 +41,7 @@ def make_pdu(prev_pdus=[], **kwargs): } pdu_fields.update(kwargs) - return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}) + return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}, {}) class FederationTestCase(unittest.TestCase): diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 80851a4258..0ad8cf6641 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -88,7 +88,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) def test_pdu_from_event(self): @@ -144,7 +144,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) self.assertEquals(pdu.state_key, event.state_key) -- cgit 1.4.1 From c8f996e29ffd7055bc6521ea610fc12ff50502e5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 11:40:35 +0100 Subject: Hash the same content covered by the signature when referencing previous PDUs rather than reusing the PDU content hashes --- synapse/crypto/event_signing.py | 19 +++++++++++---- synapse/federation/pdu_codec.py | 6 +++-- synapse/storage/__init__.py | 9 ++++++- synapse/storage/pdu.py | 4 ++-- synapse/storage/schema/signatures.sql | 18 ++++++++++++-- synapse/storage/signatures.py | 44 +++++++++++++++++++++++++++++++---- 6 files changed, 84 insertions(+), 16 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a115967c0a..32d60bd30a 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -24,15 +24,15 @@ from syutil.crypto.jsonsign import sign_json, verify_signed_json import hashlib -def hash_event_pdu(pdu, hash_algortithm=hashlib.sha256): - hashed = _compute_hash(pdu, hash_algortithm) +def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): + hashed = _compute_content_hash(pdu, hash_algorithm) pdu.hashes[hashed.name] = encode_base64(hashed.digest()) return pdu -def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): +def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algortithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) @@ -45,7 +45,7 @@ def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): return message_hash_bytes == computed_hash.digest() -def _compute_hash(pdu, hash_algorithm): +def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() pdu_json.pop("meta", None) pdu_json.pop("signatures", None) @@ -54,6 +54,15 @@ def _compute_hash(pdu, hash_algorithm): return hash_algorithm(pdu_json_bytes) +def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): + tmp_pdu = Pdu(**pdu.get_dict()) + tmp_pdu = prune_pdu(tmp_pdu) + pdu_json = tmp_pdu.get_dict() + pdu_json_bytes = encode_canonical_json(pdu_json) + hashed = hash_algorithm(pdu_json_bytes) + return (hashed.name, hashed.digest()) + + def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 11fd7264b3..7e574f451d 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,7 +14,9 @@ # limitations under the License. from .units import Pdu -from synapse.crypto.event_signing import hash_event_pdu, sign_event_pdu +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, sign_event_pdu +) import copy @@ -97,5 +99,5 @@ class PduCodec(object): kwargs["ts"] = int(self.clock.time_msec()) pdu = Pdu(**kwargs) - pdu = hash_event_pdu(pdu) + pdu = add_event_pdu_content_hash(pdu) return sign_event_pdu(pdu, self.server_name, self.signing_key) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index af05b47932..1738260cc1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,8 @@ from .signatures import SignatureStore from syutil.base64util import decode_base64 +from synapse.crypto.event_signing import compute_pdu_event_reference_hash + import json import logging import os @@ -165,7 +167,7 @@ class DataStore(RoomMemberStore, RoomStore, for hash_alg, hash_base64 in pdu.hashes.items(): hash_bytes = decode_base64(hash_base64) - self._store_pdu_hash_txn( + self._store_pdu_content_hash_txn( txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, ) @@ -185,6 +187,11 @@ class DataStore(RoomMemberStore, RoomStore, hash_bytes ) + (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + self._store_pdu_reference_hash_txn( + txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a423b42dbd..3a90c382f0 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -69,7 +69,7 @@ class PduStore(SQLBaseStore): edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin ) @@ -317,7 +317,7 @@ class PduStore(SQLBaseStore): results = [] for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) sha256_bytes = hashes["sha256"] prev_hashes = {"sha256": encode_base64(sha256_bytes)} results.append((pdu_id, origin, prev_hashes, depth)) diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index a72c4dc35f..1c45a51bec 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS pdu_hashes ( +CREATE TABLE IF NOT EXISTS pdu_content_hashes ( pdu_id TEXT, origin TEXT, algorithm TEXT, @@ -21,7 +21,21 @@ CREATE TABLE IF NOT EXISTS pdu_hashes ( CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) ); -CREATE INDEX IF NOT EXISTS pdu_hashes_id ON pdu_hashes (pdu_id, origin); +CREATE INDEX IF NOT EXISTS pdu_content_hashes_id ON pdu_content_hashes ( + pdu_id, origin +); + +CREATE TABLE IF NOT EXISTS pdu_reference_hashes ( + pdu_id TEXT, + origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) +); + +CREATE INDEX IF NOT EXISTS pdu_reference_hashes_id ON pdu_reference_hashes ( + pdu_id, origin +); CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( pdu_id TEXT, diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1147102489..85eec7ffbe 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -21,7 +21,7 @@ from twisted.internet import defer class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" - def _get_pdu_hashes_txn(self, txn, pdu_id, origin): + def _get_pdu_content_hashes_txn(self, txn, pdu_id, origin): """Get all the hashes for a given PDU. Args: txn (cursor): @@ -32,13 +32,14 @@ class SignatureStore(SQLBaseStore): """ query = ( "SELECT algorithm, hash" - " FROM pdu_hashes" + " FROM pdu_content_hashes" " WHERE pdu_id = ? and origin = ?" ) txn.execute(query, (pdu_id, origin)) return dict(txn.fetchall()) - def _store_pdu_hash_txn(self, txn, pdu_id, origin, algorithm, hash_bytes): + def _store_pdu_content_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -47,13 +48,48 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(txn, "pdu_hashes", { + self._simple_insert_txn(txn, "pdu_content_hashes", { "pdu_id": pdu_id, "origin": origin, "algorithm": algorithm, "hash": buffer(hash_bytes), }) + def _get_pdu_reference_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM pdu_reference_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + return dict(txn.fetchall()) + + def _store_pdu_reference_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "pdu_reference_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_pdu_origin_signatures_txn(self, txn, pdu_id, origin): """Get all the signatures for a given PDU. Args: -- cgit 1.4.1 From 5e2236f9ffe3a66bbe0ff37b1793e8fa59a1c475 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 27 Oct 2014 11:19:15 +0000 Subject: fix pyflakes warnings --- synapse/crypto/event_signing.py | 8 ++++---- synapse/federation/units.py | 2 ++ synapse/storage/signatures.py | 2 -- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index d3b501c6e7..61edd2c6f9 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -35,12 +35,12 @@ def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_content_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algorithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) )) - message_hash_base64 = hashes[computed_hash.name] + message_hash_base64 = pdu.hashes[computed_hash.name] try: message_hash_bytes = decode_base64(message_hash_base64) except: @@ -54,7 +54,7 @@ def _compute_content_hash(pdu, hash_algorithm): pdu_json.pop("age_ts", None) pdu_json.pop("unsigned", None) pdu_json.pop("signatures", None) - hashes = pdu_json.pop("hashes", {}) + pdu_json.pop("hashes", None) pdu_json_bytes = encode_canonical_json(pdu_json) return hash_algorithm(pdu_json_bytes) @@ -73,7 +73,7 @@ def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) pdu_json = tmp_pdu.get_dict() - pdu_jdon = sign_json(pdu_json, signature_name, signing_key) + pdu_json = sign_json(pdu_json, signature_name, signing_key) pdu.signatures = pdu_json["signatures"] return pdu diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b779d259bd..adc3385644 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -155,6 +155,8 @@ class Pdu(JsonEncodedObject): return Pdu( prev_pdus=prev_pdus, + hashes=hashes, + signatures=signatures, **args ) else: diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 85eec7ffbe..82be946d3f 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -15,8 +15,6 @@ from _base import SQLBaseStore -from twisted.internet import defer - class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" -- cgit 1.4.1 From 2d1dfb3b34583a4de7e1e53f685c2564a7fc731f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Oct 2014 16:42:35 +0000 Subject: Begin implementing all the PDU storage stuff in Events land --- synapse/api/events/__init__.py | 4 +- synapse/federation/pdu_codec.py | 11 ++- synapse/storage/__init__.py | 72 ++++++++++---- synapse/storage/_base.py | 53 +++++++---- synapse/storage/event_federation.py | 143 ++++++++++++++++++++++++++++ synapse/storage/schema/event_edges.sql | 51 ++++++++++ synapse/storage/schema/event_signatures.sql | 65 +++++++++++++ synapse/storage/schema/im.sql | 1 + synapse/storage/signatures.py | 127 ++++++++++++++++++++++++ 9 files changed, 485 insertions(+), 42 deletions(-) create mode 100644 synapse/storage/event_federation.py create mode 100644 synapse/storage/schema/event_edges.sql create mode 100644 synapse/storage/schema/event_signatures.sql (limited to 'synapse/storage/signatures.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index a5a55742e0..b855811b98 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -71,7 +71,9 @@ class SynapseEvent(JsonEncodedObject): "outlier", "power_level", "redacted", - "prev_pdus", + "prev_events", + "hashes", + "signatures", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 991aae2a56..2cd591410b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -47,7 +47,10 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_pdus"] = pdu.prev_pdus + kwargs["prev_events"] = [ + encode_event_id(i, o) + for i, o in pdu.prev_pdus + ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,8 +81,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_pdus"): - d["prev_pdus"] = event.prev_pdus + if hasattr(event, "prev_events"): + d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events] if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -92,7 +95,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type"] + if k not in ["event_id", "room_id", "type", "prev_events"] }) if "origin_server_ts" not in kwargs: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a50e19349a..678de0cf50 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -40,6 +40,7 @@ from .stream import StreamStore from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore +from .event_federation import EventFederationStore from .state import StateStore from .signatures import SignatureStore @@ -69,6 +70,7 @@ SCHEMAS = [ "redactions", "state", "signatures", + "event_edges", ] @@ -83,10 +85,12 @@ class _RollbackButIsFineException(Exception): """ pass + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, - DirectoryStore, KeyStore, StateStore, SignatureStore): + DirectoryStore, KeyStore, StateStore, SignatureStore, + EventFederationStore, ): def __init__(self, hs): super(DataStore, self).__init__(hs) @@ -230,6 +234,10 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == RoomRedactionEvent.TYPE: self._store_redaction(txn, event) + outlier = False + if hasattr(event, "outlier"): + outlier = event.outlier + vals = { "topological_ordering": event.depth, "event_id": event.event_id, @@ -237,20 +245,20 @@ class DataStore(RoomMemberStore, RoomStore, "room_id": event.room_id, "content": json.dumps(event.content), "processed": True, + "outlier": outlier, + "depth": event.depth, } if stream_ordering is not None: vals["stream_ordering"] = stream_ordering - if hasattr(event, "outlier"): - vals["outlier"] = event.outlier - else: - vals["outlier"] = False - unrec = { k: v for k, v in event.get_full_dict().items() - if k not in vals.keys() and k not in ["redacted", "redacted_because"] + if k not in vals.keys() and k not in [ + "redacted", "redacted_because", "signatures", "hashes", + "prev_events", + ] } vals["unrecognized_keys"] = json.dumps(unrec) @@ -264,6 +272,14 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + self._store_state_groups_txn(txn, event) is_state = hasattr(event, "state_key") and event.state_key is not None @@ -291,6 +307,28 @@ class DataStore(RoomMemberStore, RoomStore, } ) + signatures = event.signatures.get(event.origin, {}) + + for key_id, signature_base64 in signatures.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_origin_signature_txn( + txn, event.event_id, key_id, signature_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + self._store_pdu_reference_hash_txn( + txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes + ) + + self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) + def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " @@ -373,7 +411,7 @@ class DataStore(RoomMemberStore, RoomStore, """ def _snapshot(txn): membership_state = self._get_room_member(txn, user_id, room_id) - prev_pdus = self._get_latest_pdus_in_context( + prev_events = self._get_latest_events_in_room( txn, room_id ) @@ -388,7 +426,7 @@ class DataStore(RoomMemberStore, RoomStore, store=self, room_id=room_id, user_id=user_id, - prev_pdus=prev_pdus, + prev_events=prev_events, membership_state=membership_state, state_type=state_type, state_key=state_key, @@ -404,7 +442,7 @@ class Snapshot(object): store (DataStore): The datastore. room_id (RoomId): The room of the snapshot. user_id (UserId): The user this snapshot is for. - prev_pdus (list): The list of PDU ids this snapshot is after. + prev_events (list): The list of event ids this snapshot is after. membership_state (RoomMemberEvent): The current state of the user in the room. state_type (str, optional): State type captured by the snapshot @@ -413,29 +451,29 @@ class Snapshot(object): the previous value of the state type and key in the room. """ - def __init__(self, store, room_id, user_id, prev_pdus, + def __init__(self, store, room_id, user_id, prev_events, membership_state, state_type=None, state_key=None, prev_state_pdu=None): self.store = store self.room_id = room_id self.user_id = user_id - self.prev_pdus = prev_pdus + self.prev_events = prev_events self.membership_state = membership_state self.state_type = state_type self.state_key = state_key self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_pdus"): + if hasattr(event, "prev_events"): return - event.prev_pdus = [ + event.prev_events = [ (p_id, origin, hashes) - for p_id, origin, hashes, _ in self.prev_pdus + for p_id, origin, hashes, _ in self.prev_events ] - if self.prev_pdus: - event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 + if self.prev_events: + event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1 else: event.depth = 0 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1192216971..30732caa83 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -193,7 +193,6 @@ class SQLBaseStore(object): table, keyvalues, retcols=retcols, allow_none=allow_none ) - @defer.inlineCallbacks def _simple_select_one_onecol(self, table, keyvalues, retcol, allow_none=False): """Executes a SELECT query on the named table, which is expected to @@ -204,19 +203,41 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with retcol : string giving the name of the column to return """ - ret = yield self._simple_select_one( + return self.runInteraction( + "_simple_select_one_onecol_txn", + self._simple_select_one_onecol_txn, + table, keyvalues, retcol, allow_none=allow_none, + ) + + def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol, + allow_none=False): + ret = self._simple_select_onecol_txn( + txn, table=table, keyvalues=keyvalues, - retcols=[retcol], - allow_none=allow_none + retcols=retcol, ) if ret: - defer.returnValue(ret[retcol]) + return ret[retcol] else: - defer.returnValue(None) + if allow_none: + return None + else: + raise StoreError(404, "No row found") + + def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): + sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % { + "retcol": retcol, + "table": table, + "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + } + + txn.execute(sql, keyvalues.values()) + + return [r[0] for r in txn.fetchall()] + - @defer.inlineCallbacks def _simple_select_onecol(self, table, keyvalues, retcol): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -229,19 +250,11 @@ class SQLBaseStore(object): Returns: Deferred: Results in a list """ - sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % { - "retcol": retcol, - "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), - } - - def func(txn): - txn.execute(sql, keyvalues.values()) - return txn.fetchall() - - res = yield self.runInteraction("_simple_select_onecol", func) - - defer.returnValue([r[0] for r in res]) + return self.runInteraction( + "_simple_select_onecol", + self._simple_select_onecol_txn, + table, keyvalues, retcol + ) def _simple_select_list(self, table, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py new file mode 100644 index 0000000000..27ad9aea4d --- /dev/null +++ b/synapse/storage/event_federation.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class EventFederationStore(SQLBaseStore): + + def _get_latest_events_in_room(self, txn, room_id): + self._simple_select_onecol_txn( + txn, + table="event_forward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + ) + + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) + + def _get_min_depth_interaction(self, txn, room_id): + min_depth = self._simple_select_one_onecol_txn( + txn, + table="room_depth", + keyvalues={"room_id": room_id,}, + retcol="min_depth", + allow_none=True, + ) + + return int(min_depth) if min_depth is not None else None + + def _update_min_depth_for_room_txn(self, txn, room_id, depth): + min_depth = self._get_min_depth_interaction(txn, room_id) + + do_insert = depth < min_depth if min_depth else True + + if do_insert: + self._simple_insert_txn( + txn, + table="room_depth", + values={ + "room_id": room_id, + "min_depth": depth, + }, + or_replace=True, + ) + + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event_id, + "prev_event": e_id, + "room_id": room_id, + } + ) + + # Update the extremities table if this is not an outlier. + if not outlier: + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_delete_txn( + txn, + table="event_forward_extremities", + keyvalues={ + "event_id": e_id, + "room_id": room_id, + } + ) + + + + # We only insert as a forward extremity the new pdu if there are no + # other pdus that reference it as a prev pdu + query = ( + "INSERT INTO %(table)s (event_id, room_id) " + "SELECT ?, ? WHERE NOT EXISTS (" + "SELECT 1 FROM %(event_edges)s WHERE " + "prev_event_id = ? " + ")" + ) % { + "table": "event_forward_extremities", + "event_edges": "event_edges", + } + + logger.debug("query: %s", query) + + txn.execute(query, (event_id, room_id, event_id)) + + # Insert all the prev_pdus as a backwards thing, they'll get + # deleted in a second if they're incorrect anyway. + for e_id in prev_events: + # TODO (erikj): This could be done as a bulk insert + self._simple_insert_txn( + txn, + table="event_backward_extremities", + values={ + "event_id": e_id, + "room_id": room_id, + } + ) + + # Also delete from the backwards extremities table all ones that + # reference pdus that we have already seen + query = ( + "DELETE FROM %(event_back)s as b WHERE EXISTS (" + "SELECT 1 FROM %(events)s AS events " + "WHERE " + "b.event_id = events.event_id " + "AND not events.outlier " + ")" + ) % { + "event_back": "event_backward_extremities", + "events": "events", + } + txn.execute(query) \ No newline at end of file diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql new file mode 100644 index 0000000000..6a28314ece --- /dev/null +++ b/synapse/storage/schema/event_edges.sql @@ -0,0 +1,51 @@ + +CREATE TABLE IF NOT EXISTS event_forward_extremities( + event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +-- + +CREATE TABLE IF NOT EXISTS event_backward_extremities( + event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +-- + +CREATE TABLE IF NOT EXISTS event_edges( + event_id TEXT, + prev_event_id TEXT, + room_id TEXT, + CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id) +); + +CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); +CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +-- + + +CREATE TABLE IF NOT EXISTS room_depth( + room_id TEXT, + min_depth INTEGER, + CONSTRAINT uniqueness UNIQUE (room_id) +); + +CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +-- + +create TABLE IF NOT EXISTS event_destinations( + event_id TEXT, + destination TEXT, + delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered + CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +-- \ No newline at end of file diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql new file mode 100644 index 0000000000..5491c7ecec --- /dev/null +++ b/synapse/storage/schema/event_signatures.sql @@ -0,0 +1,65 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_content_hashes ( + event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, algorithm) +); + +CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_reference_hashes ( + event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, algorithm) +); + +CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes ( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_origin_signatures ( + event_id TEXT, + origin TEXT, + key_id TEXT, + signature BLOB, + CONSTRAINT uniqueness UNIQUE (event_id, key_id) +); + +CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures ( + event_id +); + + +CREATE TABLE IF NOT EXISTS event_edge_hashes( + event_id TEXT, + prev_event_id TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + event_id, prev_event_id, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes( + event_id +); diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 3aa83f5c8c..8d6f655993 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS events( unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, + depth INTEGER DEFAULT 0 NOT NULL, CONSTRAINT ev_uniq UNIQUE (event_id) ); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 82be946d3f..b8f8fd44cb 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -153,3 +153,130 @@ class SignatureStore(SQLBaseStore): "algorithm": algorithm, "hash": buffer(hash_bytes), }) + + ## Events ## + + def _get_event_content_hashes_txn(self, txn, event_id): + """Get all the hashes for a given Event. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM event_content_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): + """Store a hash for a Event + Args: + txn (cursor): + event_id (str): Id for the Event. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "event_content_hashes", { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_event_reference_hashes_txn(self, txn, event_id): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM event_reference_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + event_id (str): Id for the Event. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "event_reference_hashes", { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + + def _get_event_origin_signatures_txn(self, txn, event_id): + """Get all the signatures for a given PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + A dict of key_id -> signature_bytes. + """ + query = ( + "SELECT key_id, signature" + " FROM event_origin_signatures" + " WHERE event_id = ? " + ) + txn.execute(query, (event_id, )) + return dict(txn.fetchall()) + + def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id, + signature_bytes): + """Store a signature from the origin server for a PDU. + Args: + txn (cursor): + event_id (str): Id for the Event. + origin (str): origin of the Event. + key_id (str): Id for the signing key. + signature (bytes): The signature. + """ + self._simple_insert_txn(txn, "event_origin_signatures", { + "event_id": event_id, + "origin": origin, + "key_id": key_id, + "signature": buffer(signature_bytes), + }) + + def _get_prev_event_hashes_txn(self, txn, event_id): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + event_id (str): Id for the Event. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_event_id, algorithm, hash" + " FROM event_edge_hashes" + " WHERE event_id = ?" + ) + txn.execute(query, (event_id, )) + results = {} + for prev_event_id, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault(prev_event_id, {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): + self._simple_insert_txn(txn, "event_edge_hashes", { + "event_id": event_id, + "prev_event_id": prev_event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) \ No newline at end of file -- cgit 1.4.1 From aa80900a8e8cd9e7305a66cec336a8e150c46651 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Oct 2014 10:11:06 +0000 Subject: Fix SQL so that accepts we may want to persist events twice. --- synapse/storage/event_federation.py | 8 +++-- synapse/storage/signatures.py | 64 ++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 25 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5f94c31818..88d09d9ba8 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -99,7 +99,8 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - } + }, + or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -120,7 +121,7 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new pdu if there are no # other pdus that reference it as a prev pdu query = ( - "INSERT INTO %(table)s (event_id, room_id) " + "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " "SELECT ?, ? WHERE NOT EXISTS (" "SELECT 1 FROM %(event_edges)s WHERE " "prev_event_id = ? " @@ -144,7 +145,8 @@ class EventFederationStore(SQLBaseStore): values={ "event_id": e_id, "room_id": room_id, - } + }, + or_ignore=True, ) # Also delete from the backwards extremities table all ones that diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b8f8fd44cb..5e99174fcd 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -181,11 +181,16 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(txn, "event_content_hashes", { - "event_id": event_id, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) + self._simple_insert_txn( + txn, + "event_content_hashes", + { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }, + or_ignore=True, + ) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. @@ -212,11 +217,16 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(txn, "event_reference_hashes", { - "event_id": event_id, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) + self._simple_insert_txn( + txn, + "event_reference_hashes", + { + "event_id": event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }, + or_ignore=True, + ) def _get_event_origin_signatures_txn(self, txn, event_id): @@ -245,12 +255,17 @@ class SignatureStore(SQLBaseStore): key_id (str): Id for the signing key. signature (bytes): The signature. """ - self._simple_insert_txn(txn, "event_origin_signatures", { - "event_id": event_id, - "origin": origin, - "key_id": key_id, - "signature": buffer(signature_bytes), - }) + self._simple_insert_txn( + txn, + "event_origin_signatures", + { + "event_id": event_id, + "origin": origin, + "key_id": key_id, + "signature": buffer(signature_bytes), + }, + or_ignore=True, + ) def _get_prev_event_hashes_txn(self, txn, event_id): """Get all the hashes for previous PDUs of a PDU @@ -274,9 +289,14 @@ class SignatureStore(SQLBaseStore): def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, algorithm, hash_bytes): - self._simple_insert_txn(txn, "event_edge_hashes", { - "event_id": event_id, - "prev_event_id": prev_event_id, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) \ No newline at end of file + self._simple_insert_txn( + txn, + "event_edge_hashes", + { + "event_id": event_id, + "prev_event_id": prev_event_id, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }, + or_ignore=True, + ) \ No newline at end of file -- cgit 1.4.1 From 80b2710e6f9b7520f49f82b117634294c63fdb1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 17:08:36 +0000 Subject: Remove unused signature storage methods --- synapse/storage/signatures.py | 139 +----------------------------------------- 1 file changed, 1 insertion(+), 138 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 5e99174fcd..b4b3d5d7ea 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -17,144 +17,7 @@ from _base import SQLBaseStore class SignatureStore(SQLBaseStore): - """Persistence for PDU signatures and hashes""" - - def _get_pdu_content_hashes_txn(self, txn, pdu_id, origin): - """Get all the hashes for a given PDU. - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - Returns: - A dict of algorithm -> hash. - """ - query = ( - "SELECT algorithm, hash" - " FROM pdu_content_hashes" - " WHERE pdu_id = ? and origin = ?" - ) - txn.execute(query, (pdu_id, origin)) - return dict(txn.fetchall()) - - def _store_pdu_content_hash_txn(self, txn, pdu_id, origin, algorithm, - hash_bytes): - """Store a hash for a PDU - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - algorithm (str): Hashing algorithm. - hash_bytes (bytes): Hash function output bytes. - """ - self._simple_insert_txn(txn, "pdu_content_hashes", { - "pdu_id": pdu_id, - "origin": origin, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) - - def _get_pdu_reference_hashes_txn(self, txn, pdu_id, origin): - """Get all the hashes for a given PDU. - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - Returns: - A dict of algorithm -> hash. - """ - query = ( - "SELECT algorithm, hash" - " FROM pdu_reference_hashes" - " WHERE pdu_id = ? and origin = ?" - ) - txn.execute(query, (pdu_id, origin)) - return dict(txn.fetchall()) - - def _store_pdu_reference_hash_txn(self, txn, pdu_id, origin, algorithm, - hash_bytes): - """Store a hash for a PDU - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - algorithm (str): Hashing algorithm. - hash_bytes (bytes): Hash function output bytes. - """ - self._simple_insert_txn(txn, "pdu_reference_hashes", { - "pdu_id": pdu_id, - "origin": origin, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) - - - def _get_pdu_origin_signatures_txn(self, txn, pdu_id, origin): - """Get all the signatures for a given PDU. - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - Returns: - A dict of key_id -> signature_bytes. - """ - query = ( - "SELECT key_id, signature" - " FROM pdu_origin_signatures" - " WHERE pdu_id = ? and origin = ?" - ) - txn.execute(query, (pdu_id, origin)) - return dict(txn.fetchall()) - - def _store_pdu_origin_signature_txn(self, txn, pdu_id, origin, key_id, - signature_bytes): - """Store a signature from the origin server for a PDU. - Args: - txn (cursor): - pdu_id (str): Id for the PDU. - origin (str): origin of the PDU. - key_id (str): Id for the signing key. - signature (bytes): The signature. - """ - self._simple_insert_txn(txn, "pdu_origin_signatures", { - "pdu_id": pdu_id, - "origin": origin, - "key_id": key_id, - "signature": buffer(signature_bytes), - }) - - def _get_prev_pdu_hashes_txn(self, txn, pdu_id, origin): - """Get all the hashes for previous PDUs of a PDU - Args: - txn (cursor): - pdu_id (str): Id of the PDU. - origin (str): Origin of the PDU. - Returns: - dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. - """ - query = ( - "SELECT prev_pdu_id, prev_origin, algorithm, hash" - " FROM pdu_edge_hashes" - " WHERE pdu_id = ? and origin = ?" - ) - txn.execute(query, (pdu_id, origin)) - results = {} - for prev_pdu_id, prev_origin, algorithm, hash_bytes in txn.fetchall(): - hashes = results.setdefault((prev_pdu_id, prev_origin), {}) - hashes[algorithm] = hash_bytes - return results - - def _store_prev_pdu_hash_txn(self, txn, pdu_id, origin, prev_pdu_id, - prev_origin, algorithm, hash_bytes): - self._simple_insert_txn(txn, "pdu_edge_hashes", { - "pdu_id": pdu_id, - "origin": origin, - "prev_pdu_id": prev_pdu_id, - "prev_origin": prev_origin, - "algorithm": algorithm, - "hash": buffer(hash_bytes), - }) - - ## Events ## + """Persistence for event signatures and hashes""" def _get_event_content_hashes_txn(self, txn, event_id): """Get all the hashes for a given Event. -- cgit 1.4.1 From bf6b72eb558cca94e209a541188079750bfefea0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 18:42:18 +0000 Subject: Start implementing auth chains --- synapse/api/auth.py | 3 +- synapse/api/events/__init__.py | 2 +- synapse/handlers/_base.py | 59 ++++++++++++++++++++++++++++++++-- synapse/storage/__init__.py | 12 ++++++- synapse/storage/_base.py | 2 ++ synapse/storage/event_federation.py | 21 ++++++++++++ synapse/storage/schema/event_edges.sql | 10 ++++++ synapse/storage/signatures.py | 12 +++++++ 8 files changed, 115 insertions(+), 6 deletions(-) (limited to 'synapse/storage/signatures.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bb25c4ec55..e1302553d7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -21,8 +21,7 @@ from synapse.api.constants import Membership, JoinRules from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, - RoomJoinRulesEvent, InviteJoinEvent, - RoomCreateEvent, + RoomJoinRulesEvent, RoomCreateEvent, ) from synapse.util.logutils import log_function diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 84d3a98365..513a48f568 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -61,7 +61,7 @@ class SynapseEvent(JsonEncodedObject): "replaces_state", "redacted_because", "origin_server_ts", - "auth_chains", + "auth_events", ] internal_keys = [ diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 509f7b550c..2613fa7fce 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -14,11 +14,15 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import LimitExceededError +from synapse.api.errors import LimitExceededError from synapse.util.async import run_on_reactor - from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.events.room import ( + RoomCreateEvent, RoomMemberEvent, RoomPowerLevelsEvent, RoomJoinRulesEvent, +) +from synapse.api.constants import Membership, JoinRules +from syutil.base64util import encode_base64 import logging @@ -55,6 +59,53 @@ class BaseHandler(object): retry_after_ms=int(1000*(time_allowed - time_now)), ) + @defer.inlineCallbacks + def _add_auth(self, event): + if event.type == RoomCreateEvent.TYPE: + event.auth_events = [] + return + + auth_events = [] + + key = (RoomPowerLevelsEvent.TYPE, "", ) + power_level_event = event.old_state_events.get(key) + + if power_level_event: + auth_events.append(power_level_event.event_id) + + key = (RoomJoinRulesEvent.TYPE, "", ) + join_rule_event = event.old_state_events.get(key) + + key = (RoomMemberEvent.TYPE, event.user_id, ) + member_event = event.old_state_events.get(key) + + if join_rule_event: + join_rule = join_rule_event.content.get("join_rule") + is_public = join_rule == JoinRules.PUBLIC if join_rule else False + + if event.type == RoomMemberEvent.TYPE: + if event.content["membership"] == Membership.JOIN: + if is_public: + auth_events.append(join_rule_event.event_id) + elif member_event: + auth_events.append(member_event.event_id) + + if member_event: + if member_event.content["membership"] == Membership.JOIN: + auth_events.append(member_event.event_id) + + hashes = yield self.store.get_event_reference_hashes( + auth_events + ) + hashes = [ + { + k: encode_base64(v) for k, v in h.items() + if k == "sha256" + } + for h in hashes + ] + event.auth_events = zip(auth_events, hashes) + @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): @@ -64,6 +115,8 @@ class BaseHandler(object): yield self.state_handler.annotate_state_groups(event) + yield self._add_auth(event) + logger.debug("Signing event...") add_hashes_and_signatures( @@ -76,6 +129,8 @@ class BaseHandler(object): logger.debug("Authing...") self.auth.check(event, raises=True) logger.debug("Authed") + else: + logger.debug("Suppressed auth.") yield self.store.persist_event(event) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2a1970914f..48ad4d864f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -19,7 +19,6 @@ from synapse.api.events.room import ( RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent, RoomJoinRulesEvent, - RoomPowerLevelsEvent, RoomRedactionEvent, ) @@ -302,6 +301,17 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, prev_event_id, alg, hash_bytes ) + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( txn, event.event_id, ref_alg, ref_hash_bytes diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7821fc4726..9aa404695d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -474,6 +474,8 @@ class SQLBaseStore(object): if is_state == 0 ] + ev.auth_events = self._get_auth_events(txn, ev.event_id) + if hasattr(ev, "state_key"): ev.prev_state = [ (e_id, h) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 180a764134..86c68ebf87 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -139,6 +139,27 @@ class EventFederationStore(SQLBaseStore): return results + def _get_auth_events(self, txn, event_id): + auth_ids = self._simple_select_onecol_txn( + txn, + table="event_auth", + keyvalues={ + "event_id": event_id, + }, + retcol="auth_id", + ) + + results = [] + for auth_id in auth_ids: + hashes = self._get_event_reference_hashes_txn(txn, auth_id) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((auth_id, prev_hashes)) + + return results + def get_min_depth(self, room_id): return self.runInteraction( "get_min_depth", diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index 51695826a8..be1c72a775 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -63,3 +63,13 @@ CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( ); CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); + +CREATE TABLE IF NOT EXISTS event_auth( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id) +); + +CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); +CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); \ No newline at end of file diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index b4b3d5d7ea..84a49088a2 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -55,6 +55,18 @@ class SignatureStore(SQLBaseStore): or_ignore=True, ) + def get_event_reference_hashes(self, event_ids): + def f(txn): + return [ + self._get_event_reference_hashes_txn(txn, ev) + for ev in event_ids + ] + + return self.runInteraction( + "get_event_reference_hashes", + f + ) + def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. Args: -- cgit 1.4.1