From 1c445f88f64beabf0bd9bec3950a4a4c0d529e8a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 15 Oct 2014 17:09:04 +0100 Subject: persist hashes and origin signatures for PDUs --- synapse/storage/pdu.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage/pdu.py') diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index d70467dcd6..9d624429b7 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -64,6 +64,11 @@ class PduStore(SQLBaseStore): for r in PduEdgesTable.decode_results(txn.fetchall()) ] + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + signatures = self._get_pdu_origin_signatures_txn( + txn, pdu_id, origin + ) + query = ( "SELECT %(fields)s FROM %(pdus)s as p " "LEFT JOIN %(state)s as s " @@ -80,7 +85,9 @@ class PduStore(SQLBaseStore): row = txn.fetchone() if row: - results.append(PduTuple(PduEntry(*row), edges)) + results.append(PduTuple( + PduEntry(*row), edges, hashes, signatures + )) return results @@ -908,7 +915,7 @@ This does not include a prev_pdus key. PduTuple = namedtuple( "PduTuple", - ("pdu_entry", "prev_pdu_list") + ("pdu_entry", "prev_pdu_list", "hashes", "signatures") ) """ This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent the `prev_pdus` key of a PDU. -- cgit 1.5.1 From bb04447c44036ebf3ae5dde7a4cc7a7909d50ef6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Oct 2014 23:25:12 +0100 Subject: Include hashes of previous pdus when referencing them --- synapse/api/events/__init__.py | 2 +- synapse/federation/pdu_codec.py | 13 ++++--------- synapse/federation/replication.py | 2 +- synapse/federation/units.py | 10 +++++++++- synapse/state.py | 4 ---- synapse/storage/__init__.py | 20 ++++++++++++++------ synapse/storage/pdu.py | 22 ++++++++++++++++------ synapse/storage/schema/signatures.sql | 16 ++++++++++++++++ synapse/storage/signatures.py | 31 +++++++++++++++++++++++++++++++ tests/federation/test_federation.py | 2 +- tests/federation/test_pdu_codec.py | 4 ++-- 11 files changed, 95 insertions(+), 31 deletions(-) (limited to 'synapse/storage/pdu.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index f66fea2904..a5a55742e0 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -65,13 +65,13 @@ class SynapseEvent(JsonEncodedObject): internal_keys = [ "is_state", - "prev_events", "depth", "destinations", "origin", "outlier", "power_level", "redacted", + "prev_pdus", ] required_keys = [ diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index bcac5f9ae8..11fd7264b3 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -45,9 +45,7 @@ class PduCodec(object): kwargs["event_id"] = encode_event_id(pdu.pdu_id, pdu.origin) kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type - kwargs["prev_events"] = [ - encode_event_id(p[0], p[1]) for p in pdu.prev_pdus - ] + kwargs["prev_pdus"] = pdu.prev_pdus if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): kwargs["prev_state"] = encode_event_id( @@ -78,11 +76,8 @@ class PduCodec(object): d["context"] = event.room_id d["pdu_type"] = event.type - if hasattr(event, "prev_events"): - d["prev_pdus"] = [ - decode_event_id(e, self.server_name) - for e in event.prev_events - ] + if hasattr(event, "prev_pdus"): + d["prev_pdus"] = event.prev_pdus if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( @@ -95,7 +90,7 @@ class PduCodec(object): kwargs = copy.deepcopy(event.unrecognized_keys) kwargs.update({ k: v for k, v in d.items() - if k not in ["event_id", "room_id", "type", "prev_events"] + if k not in ["event_id", "room_id", "type"] }) if "ts" not in kwargs: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9363ac7300..788a49b8e8 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -443,7 +443,7 @@ class ReplicationLayer(object): min_depth = yield self.store.get_min_depth_for_context(pdu.context) if min_depth and pdu.depth > min_depth: - for pdu_id, origin in pdu.prev_pdus: + for pdu_id, origin, hashes in pdu.prev_pdus: exists = yield self._get_persisted_pdu(pdu_id, origin) if not exists: diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 3518efb215..6a43007837 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -141,8 +141,16 @@ class Pdu(JsonEncodedObject): for kid, sig in pdu_tuple.signatures.items() } + prev_pdus = [] + for prev_pdu in pdu_tuple.prev_pdu_list: + prev_hashes = pdu_tuple.edge_hashes.get(prev_pdu, {}) + prev_hashes = { + alg: encode_base64(hsh) for alg, hsh in prev_hashes.items() + } + prev_pdus.append((prev_pdu[0], prev_pdu[1], prev_hashes)) + return Pdu( - prev_pdus=pdu_tuple.prev_pdu_list, + prev_pdus=prev_pdus, **args ) else: diff --git a/synapse/state.py b/synapse/state.py index 9db84c9b5c..bc6b928ec7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -72,10 +72,6 @@ class StateHandler(object): snapshot.fill_out_prev_events(event) - event.prev_events = [ - e for e in event.prev_events if e != event.event_id - ] - current_state = snapshot.prev_state_pdu if current_state: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b2a3f0b56c..af05b47932 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -177,6 +177,14 @@ class DataStore(RoomMemberStore, RoomStore, txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, ) + for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_pdu_hash_txn( + txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, + hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: @@ -352,6 +360,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_pdus = self._get_latest_pdus_in_context( txn, room_id ) + if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( txn, room_id, state_type, state_key @@ -401,17 +410,16 @@ class Snapshot(object): self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): + if hasattr(event, "prev_pdus"): return - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus + event.prev_pdus = [ + (p_id, origin, hashes) + for p_id, origin, hashes, _ in self.prev_pdus ] - event.prev_events = [e for e in es if e != event.event_id] - if self.prev_pdus: - event.depth = max([int(v) for _, _, v in self.prev_pdus]) + 1 + event.depth = max([int(v) for _, _, _, v in self.prev_pdus]) + 1 else: event.depth = 0 diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 9d624429b7..a423b42dbd 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -20,10 +20,13 @@ from ._base import SQLBaseStore, Table, JoinHelper from synapse.federation.units import Pdu from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 + from collections import namedtuple import logging + logger = logging.getLogger(__name__) @@ -64,6 +67,8 @@ class PduStore(SQLBaseStore): for r in PduEdgesTable.decode_results(txn.fetchall()) ] + edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin @@ -86,7 +91,7 @@ class PduStore(SQLBaseStore): row = txn.fetchone() if row: results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures + PduEntry(*row), edges, hashes, signatures, edge_hashes )) return results @@ -310,9 +315,14 @@ class PduStore(SQLBaseStore): (context, ) ) - results = txn.fetchall() + results = [] + for pdu_id, origin, depth in txn.fetchall(): + hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + sha256_bytes = hashes["sha256"] + prev_hashes = {"sha256": encode_base64(sha256_bytes)} + results.append((pdu_id, origin, prev_hashes, depth)) - return [(row[0], row[1], row[2]) for row in results] + return results @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): @@ -431,7 +441,7 @@ class PduStore(SQLBaseStore): "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" % PduForwardExtremitiesTable.table_name ) - txn.executemany(query, prev_pdus) + txn.executemany(query, list(p[:2] for p in prev_pdus)) # We only insert as a forward extremety the new pdu if there are no # other pdus that reference it as a prev pdu @@ -454,7 +464,7 @@ class PduStore(SQLBaseStore): # deleted in a second if they're incorrect anyway. txn.executemany( PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o in prev_pdus] + [(i, o, context) for i, o, _ in prev_pdus] ) # Also delete from the backwards extremities table all ones that @@ -915,7 +925,7 @@ This does not include a prev_pdus key. PduTuple = namedtuple( "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures") + ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") ) """ This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent the `prev_pdus` key of a PDU. diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index 86ee0f2377..a72c4dc35f 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -34,3 +34,19 @@ CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( CREATE INDEX IF NOT EXISTS pdu_origin_signatures_id ON pdu_origin_signatures ( pdu_id, origin ); + +CREATE TABLE IF NOT EXISTS pdu_edge_hashes( + pdu_id TEXT, + origin TEXT, + prev_pdu_id TEXT, + prev_origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE ( + pdu_id, origin, prev_pdu_id, prev_origin, algorithm + ) +); + +CREATE INDEX IF NOT EXISTS pdu_edge_hashes_id ON pdu_edge_hashes( + pdu_id, origin +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1f0a680500..1147102489 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -88,3 +88,34 @@ class SignatureStore(SQLBaseStore): "signature": buffer(signature_bytes), }) + def _get_prev_pdu_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for previous PDUs of a PDU + Args: + txn (cursor): + pdu_id (str): Id of the PDU. + origin (str): Origin of the PDU. + Returns: + dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes. + """ + query = ( + "SELECT prev_pdu_id, prev_origin, algorithm, hash" + " FROM pdu_edge_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + results = {} + for prev_pdu_id, prev_origin, algorithm, hash_bytes in txn.fetchall(): + hashes = results.setdefault((prev_pdu_id, prev_origin), {}) + hashes[algorithm] = hash_bytes + return results + + def _store_prev_pdu_hash_txn(self, txn, pdu_id, origin, prev_pdu_id, + prev_origin, algorithm, hash_bytes): + self._simple_insert_txn(txn, "pdu_edge_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "prev_pdu_id": prev_pdu_id, + "prev_origin": prev_origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 03b2167cf7..eed50e6335 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -41,7 +41,7 @@ def make_pdu(prev_pdus=[], **kwargs): } pdu_fields.update(kwargs) - return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}) + return PduTuple(PduEntry(**pdu_fields), prev_pdus, {}, {}, {}) class FederationTestCase(unittest.TestCase): diff --git a/tests/federation/test_pdu_codec.py b/tests/federation/test_pdu_codec.py index 80851a4258..0ad8cf6641 100644 --- a/tests/federation/test_pdu_codec.py +++ b/tests/federation/test_pdu_codec.py @@ -88,7 +88,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) def test_pdu_from_event(self): @@ -144,7 +144,7 @@ class PduCodecTestCase(unittest.TestCase): self.assertEquals(pdu.context, event.room_id) self.assertEquals(pdu.is_state, event.is_state) self.assertEquals(pdu.depth, event.depth) - self.assertEquals(["alice@bob.com"], event.prev_events) + self.assertEquals(pdu.prev_pdus, event.prev_pdus) self.assertEquals(pdu.content, event.content) self.assertEquals(pdu.state_key, event.state_key) -- cgit 1.5.1 From c8f996e29ffd7055bc6521ea610fc12ff50502e5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Oct 2014 11:40:35 +0100 Subject: Hash the same content covered by the signature when referencing previous PDUs rather than reusing the PDU content hashes --- synapse/crypto/event_signing.py | 19 +++++++++++---- synapse/federation/pdu_codec.py | 6 +++-- synapse/storage/__init__.py | 9 ++++++- synapse/storage/pdu.py | 4 ++-- synapse/storage/schema/signatures.sql | 18 ++++++++++++-- synapse/storage/signatures.py | 44 +++++++++++++++++++++++++++++++---- 6 files changed, 84 insertions(+), 16 deletions(-) (limited to 'synapse/storage/pdu.py') diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index a115967c0a..32d60bd30a 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -24,15 +24,15 @@ from syutil.crypto.jsonsign import sign_json, verify_signed_json import hashlib -def hash_event_pdu(pdu, hash_algortithm=hashlib.sha256): - hashed = _compute_hash(pdu, hash_algortithm) +def add_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): + hashed = _compute_content_hash(pdu, hash_algorithm) pdu.hashes[hashed.name] = encode_base64(hashed.digest()) return pdu -def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): +def check_event_pdu_content_hash(pdu, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" - computed_hash = _compute_hash(pdu, hash_algortithm) + computed_hash = _compute_content_hash(pdu, hash_algortithm) if computed_hash.name not in pdu.hashes: raise Exception("Algorithm %s not in hashes %s" % ( computed_hash.name, list(pdu.hashes) @@ -45,7 +45,7 @@ def check_event_pdu_hash(pdu, hash_algorithm=hashlib.sha256): return message_hash_bytes == computed_hash.digest() -def _compute_hash(pdu, hash_algorithm): +def _compute_content_hash(pdu, hash_algorithm): pdu_json = pdu.get_dict() pdu_json.pop("meta", None) pdu_json.pop("signatures", None) @@ -54,6 +54,15 @@ def _compute_hash(pdu, hash_algorithm): return hash_algorithm(pdu_json_bytes) +def compute_pdu_event_reference_hash(pdu, hash_algorithm=hashlib.sha256): + tmp_pdu = Pdu(**pdu.get_dict()) + tmp_pdu = prune_pdu(tmp_pdu) + pdu_json = tmp_pdu.get_dict() + pdu_json_bytes = encode_canonical_json(pdu_json) + hashed = hash_algorithm(pdu_json_bytes) + return (hashed.name, hashed.digest()) + + def sign_event_pdu(pdu, signature_name, signing_key): tmp_pdu = Pdu(**pdu.get_dict()) tmp_pdu = prune_pdu(tmp_pdu) diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 11fd7264b3..7e574f451d 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -14,7 +14,9 @@ # limitations under the License. from .units import Pdu -from synapse.crypto.event_signing import hash_event_pdu, sign_event_pdu +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, sign_event_pdu +) import copy @@ -97,5 +99,5 @@ class PduCodec(object): kwargs["ts"] = int(self.clock.time_msec()) pdu = Pdu(**kwargs) - pdu = hash_event_pdu(pdu) + pdu = add_event_pdu_content_hash(pdu) return sign_event_pdu(pdu, self.server_name, self.signing_key) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index af05b47932..1738260cc1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,8 @@ from .signatures import SignatureStore from syutil.base64util import decode_base64 +from synapse.crypto.event_signing import compute_pdu_event_reference_hash + import json import logging import os @@ -165,7 +167,7 @@ class DataStore(RoomMemberStore, RoomStore, for hash_alg, hash_base64 in pdu.hashes.items(): hash_bytes = decode_base64(hash_base64) - self._store_pdu_hash_txn( + self._store_pdu_content_hash_txn( txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, ) @@ -185,6 +187,11 @@ class DataStore(RoomMemberStore, RoomStore, hash_bytes ) + (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + self._store_pdu_reference_hash_txn( + txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes + ) + if pdu.is_state: self._persist_state_txn(txn, pdu.prev_pdus, cols) else: diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a423b42dbd..3a90c382f0 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -69,7 +69,7 @@ class PduStore(SQLBaseStore): edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) signatures = self._get_pdu_origin_signatures_txn( txn, pdu_id, origin ) @@ -317,7 +317,7 @@ class PduStore(SQLBaseStore): results = [] for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_hashes_txn(txn, pdu_id, origin) + hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) sha256_bytes = hashes["sha256"] prev_hashes = {"sha256": encode_base64(sha256_bytes)} results.append((pdu_id, origin, prev_hashes, depth)) diff --git a/synapse/storage/schema/signatures.sql b/synapse/storage/schema/signatures.sql index a72c4dc35f..1c45a51bec 100644 --- a/synapse/storage/schema/signatures.sql +++ b/synapse/storage/schema/signatures.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS pdu_hashes ( +CREATE TABLE IF NOT EXISTS pdu_content_hashes ( pdu_id TEXT, origin TEXT, algorithm TEXT, @@ -21,7 +21,21 @@ CREATE TABLE IF NOT EXISTS pdu_hashes ( CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) ); -CREATE INDEX IF NOT EXISTS pdu_hashes_id ON pdu_hashes (pdu_id, origin); +CREATE INDEX IF NOT EXISTS pdu_content_hashes_id ON pdu_content_hashes ( + pdu_id, origin +); + +CREATE TABLE IF NOT EXISTS pdu_reference_hashes ( + pdu_id TEXT, + origin TEXT, + algorithm TEXT, + hash BLOB, + CONSTRAINT uniqueness UNIQUE (pdu_id, origin, algorithm) +); + +CREATE INDEX IF NOT EXISTS pdu_reference_hashes_id ON pdu_reference_hashes ( + pdu_id, origin +); CREATE TABLE IF NOT EXISTS pdu_origin_signatures ( pdu_id TEXT, diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 1147102489..85eec7ffbe 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -21,7 +21,7 @@ from twisted.internet import defer class SignatureStore(SQLBaseStore): """Persistence for PDU signatures and hashes""" - def _get_pdu_hashes_txn(self, txn, pdu_id, origin): + def _get_pdu_content_hashes_txn(self, txn, pdu_id, origin): """Get all the hashes for a given PDU. Args: txn (cursor): @@ -32,13 +32,14 @@ class SignatureStore(SQLBaseStore): """ query = ( "SELECT algorithm, hash" - " FROM pdu_hashes" + " FROM pdu_content_hashes" " WHERE pdu_id = ? and origin = ?" ) txn.execute(query, (pdu_id, origin)) return dict(txn.fetchall()) - def _store_pdu_hash_txn(self, txn, pdu_id, origin, algorithm, hash_bytes): + def _store_pdu_content_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -47,13 +48,48 @@ class SignatureStore(SQLBaseStore): algorithm (str): Hashing algorithm. hash_bytes (bytes): Hash function output bytes. """ - self._simple_insert_txn(txn, "pdu_hashes", { + self._simple_insert_txn(txn, "pdu_content_hashes", { "pdu_id": pdu_id, "origin": origin, "algorithm": algorithm, "hash": buffer(hash_bytes), }) + def _get_pdu_reference_hashes_txn(self, txn, pdu_id, origin): + """Get all the hashes for a given PDU. + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + Returns: + A dict of algorithm -> hash. + """ + query = ( + "SELECT algorithm, hash" + " FROM pdu_reference_hashes" + " WHERE pdu_id = ? and origin = ?" + ) + txn.execute(query, (pdu_id, origin)) + return dict(txn.fetchall()) + + def _store_pdu_reference_hash_txn(self, txn, pdu_id, origin, algorithm, + hash_bytes): + """Store a hash for a PDU + Args: + txn (cursor): + pdu_id (str): Id for the PDU. + origin (str): origin of the PDU. + algorithm (str): Hashing algorithm. + hash_bytes (bytes): Hash function output bytes. + """ + self._simple_insert_txn(txn, "pdu_reference_hashes", { + "pdu_id": pdu_id, + "origin": origin, + "algorithm": algorithm, + "hash": buffer(hash_bytes), + }) + + def _get_pdu_origin_signatures_txn(self, txn, pdu_id, origin): """Get all the signatures for a given PDU. Args: -- cgit 1.5.1 From 5ffe5ab43fa090111a0141b04ce6342172f60724 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Oct 2014 18:56:42 +0100 Subject: Use state groups to get current state. Make join dance actually work. --- synapse/api/auth.py | 5 +++ synapse/federation/replication.py | 17 +++++++- synapse/federation/transport.py | 57 +++++++++++++++++++++++--- synapse/handlers/federation.py | 74 +++++++++++++++++++++++---------- synapse/handlers/message.py | 6 +-- synapse/rest/base.py | 5 +++ synapse/rest/events.py | 34 ++++++++++------ synapse/state.py | 86 +++++++++++++++++++++++++++------------ synapse/storage/pdu.py | 6 +++ synapse/storage/state.py | 3 ++ 10 files changed, 226 insertions(+), 67 deletions(-) (limited to 'synapse/storage/pdu.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d1eca791ab..50ce7eb4cd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,6 +22,7 @@ from synapse.api.errors import AuthError, StoreError, Codes, SynapseError from synapse.api.events.room import ( RoomMemberEvent, RoomPowerLevelsEvent, RoomRedactionEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent, InviteJoinEvent, + RoomCreateEvent, ) from synapse.util.logutils import log_function @@ -59,6 +60,10 @@ class Auth(object): is_state = hasattr(event, "state_key") + if event.type == RoomCreateEvent.TYPE: + # FIXME + defer.returnValue(True) + if event.type == RoomMemberEvent.TYPE: yield self._can_replace_state(event) allowed = yield self.is_membership_change_allowed(event) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index d482193851..8c7d510ef6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -403,11 +403,18 @@ class ReplicationLayer(object): defer.returnValue( (404, "No handler for Query type '%s'" % (query_type, )) ) + @defer.inlineCallbacks def on_make_join_request(self, context, user_id): pdu = yield self.handler.on_make_join_request(context, user_id) defer.returnValue(pdu.get_dict()) + @defer.inlineCallbacks + def on_invite_request(self, origin, content): + pdu = Pdu(**content) + ret_pdu = yield self.handler.on_send_join_request(origin, pdu) + defer.returnValue((200, ret_pdu.get_dict())) + @defer.inlineCallbacks def on_send_join_request(self, origin, content): pdu = Pdu(**content) @@ -426,8 +433,9 @@ class ReplicationLayer(object): defer.returnValue(Pdu(**pdu_dict)) + @defer.inlineCallbacks def send_join(self, destination, pdu): - return self.transport_layer.send_join( + _, content = yield self.transport_layer.send_join( destination, pdu.context, pdu.pdu_id, @@ -435,6 +443,13 @@ class ReplicationLayer(object): pdu.get_dict(), ) + logger.debug("Got content: %s", content) + pdus = [Pdu(outlier=True, **p) for p in content.get("pdus", [])] + for pdu in pdus: + yield self._handle_new_pdu(destination, pdu) + + defer.returnValue(pdus) + @defer.inlineCallbacks @log_function def _get_persisted_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index a0d34fd24d..de64702e2f 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -229,13 +229,36 @@ class TransportLayer(object): pdu_id, ) - response = yield self.client.put_json( + code, content = yield self.client.put_json( destination=destination, path=path, data=content, ) - defer.returnValue(response) + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_join", code) + + defer.returnValue(json.loads(content)) + + @defer.inlineCallbacks + @log_function + def send_invite(self, destination, context, pdu_id, origin, content): + path = PREFIX + "/invite/%s/%s/%s" % ( + context, + origin, + pdu_id, + ) + + code, content = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) @defer.inlineCallbacks def _authenticate_request(self, request): @@ -297,9 +320,13 @@ class TransportLayer(object): @defer.inlineCallbacks def new_handler(request, *args, **kwargs): (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + try: + response = yield handler( + origin, content, request.args, *args, **kwargs + ) + except: + logger.exception("Callback failed") + raise defer.returnValue(response) return new_handler @@ -419,6 +446,17 @@ class TransportLayer(object): ) ) + self.server.register_path( + "PUT", + re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, pdu_origin, pdu_id: + self._on_invite_request( + origin, content, query, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -524,6 +562,15 @@ class TransportLayer(object): defer.returnValue((200, content)) + @defer.inlineCallbacks + @log_function + def _on_invite_request(self, origin, content, query): + content = yield self.request_handler.on_invite_request( + origin, content, + ) + + defer.returnValue((200, content)) + class TransportReceivedHandler(object): """ Callbacks used when we receive a transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0ae0541bd3..70790aaa72 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -62,6 +62,9 @@ class FederationHandler(BaseHandler): self.pdu_codec = PduCodec(hs) + # When joining a room we need to queue any events for that room up + self.room_queues = {} + @log_function @defer.inlineCallbacks def handle_new_event(self, event, snapshot): @@ -95,22 +98,25 @@ class FederationHandler(BaseHandler): logger.debug("Got event: %s", event.event_id) + if event.room_id in self.room_queues: + self.room_queues[event.room_id].append(pdu) + return + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] state = {(e.type, e.state_key): e for e in state} - yield self.state_handler.annotate_state_groups(event, state=state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) logger.debug("Event: %s", event) if not backfilled: yield self.auth.check(event, None, raises=True) - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - else: - is_new_state = False + is_new_state = is_new_state and not backfilled # TODO: Implement something in federation that allows us to # respond to PDU. @@ -211,6 +217,8 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) + self.room_queues[room_id] = [] + event.event_id = self.event_factory.create_event_id() event.content = content @@ -219,15 +227,14 @@ class FederationHandler(BaseHandler): self.pdu_codec.pdu_from_event(event) ) - # TODO (erikj): Time out here. - d = defer.Deferred() - self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d) - reactor.callLater(10, d.cancel) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield d - except defer.CancelledError: - raise SynapseError(500, "Unable to join remote room") + logger.debug("do_invite_join state: %s", state) + + is_new_state = yield self.state_handler.annotate_state_groups( + event, + state=state + ) try: yield self.store.store_room( @@ -239,6 +246,32 @@ class FederationHandler(BaseHandler): # FIXME pass + for e in state: + # FIXME: Auth these. + is_new_state = yield self.state_handler.annotate_state_groups( + e, + state=state + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + + yield self.store.persist_event( + event, + backfilled=False, + is_new_state=is_new_state + ) + + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] + + for p in room_queue: + p.outlier = True + yield self.on_receive_pdu(p, backfilled=False) + defer.returnValue(True) @defer.inlineCallbacks @@ -264,13 +297,9 @@ class FederationHandler(BaseHandler): def on_send_join_request(self, origin, pdu): event = self.pdu_codec.event_from_pdu(pdu) - yield self.state_handler.annotate_state_groups(event) + is_new_state= yield self.state_handler.annotate_state_groups(event) yield self.auth.check(event, None, raises=True) - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - # FIXME (erikj): All this is duplicated above :( yield self.store.persist_event( @@ -303,7 +332,10 @@ class FederationHandler(BaseHandler): yield self.replication_layer.send_pdu(new_pdu) - defer.returnValue(event.state_events.values()) + defer.returnValue([ + self.pdu_codec.pdu_from_event(e) + for e in event.state_events.values() + ]) @defer.inlineCallbacks def get_state_for_pdu(self, pdu_id, pdu_origin): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c2cbce151..4aaf97a83e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -199,7 +199,7 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_current_state( + data = yield self.state_handler.get_current_state( room_id, event_type, state_key ) defer.returnValue(data) @@ -238,7 +238,7 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # TODO: This is duplicating logic from snapshot_all_rooms - current_state = yield self.store.get_current_state(room_id) + current_state = yield self.state_handler.get_current_state(room_id) defer.returnValue([self.hs.serialize_event(c) for c in current_state]) @defer.inlineCallbacks @@ -315,7 +315,7 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), } - current_state = yield self.store.get_current_state( + current_state = yield self.state_handler.get_current_state( event.room_id ) d["state"] = [self.hs.serialize_event(c) for c in current_state] diff --git a/synapse/rest/base.py b/synapse/rest/base.py index 2e8e3fa7d4..dc784c1527 100644 --- a/synapse/rest/base.py +++ b/synapse/rest/base.py @@ -18,6 +18,11 @@ from synapse.api.urls import CLIENT_PREFIX from synapse.rest.transactions import HttpTransactionStore import re +import logging + + +logger = logging.getLogger(__name__) + def client_path_pattern(path_regex): """Creates a regex compiled client path with the correct client path diff --git a/synapse/rest/events.py b/synapse/rest/events.py index 097195d7cc..92ff5e5ca7 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -20,6 +20,12 @@ from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig from synapse.rest.base import RestServlet, client_path_pattern +import logging + + +logger = logging.getLogger(__name__) + + class EventStreamRestServlet(RestServlet): PATTERN = client_path_pattern("/events$") @@ -29,18 +35,22 @@ class EventStreamRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): auth_user = yield self.auth.get_user_by_req(request) - - handler = self.handlers.event_stream_handler - pagin_config = PaginationConfig.from_request(request) - timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS - if "timeout" in request.args: - try: - timeout = int(request.args["timeout"][0]) - except ValueError: - raise SynapseError(400, "timeout must be in milliseconds.") - - chunk = yield handler.get_stream(auth_user.to_string(), pagin_config, - timeout=timeout) + try: + handler = self.handlers.event_stream_handler + pagin_config = PaginationConfig.from_request(request) + timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS + if "timeout" in request.args: + try: + timeout = int(request.args["timeout"][0]) + except ValueError: + raise SynapseError(400, "timeout must be in milliseconds.") + + chunk = yield handler.get_stream( + auth_user.to_string(), pagin_config, timeout=timeout + ) + except: + logger.exception("Event stream failed") + raise defer.returnValue((200, chunk)) diff --git a/synapse/state.py b/synapse/state.py index 8c4eeb8924..24685c6fb4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.federation.pdu_codec import encode_event_id, decode_event_id from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import encode_event_id from collections import namedtuple @@ -130,54 +131,89 @@ class StateHandler(object): defer.returnValue(is_new) @defer.inlineCallbacks + @log_function def annotate_state_groups(self, event, state=None): if state: event.state_group = None event.old_state_events = None - event.state_events = state + event.state_events = {(s.type, s.state_key): s for s in state} + defer.returnValue(False) + return + + if hasattr(event, "outlier") and event.outlier: + event.state_group = None + event.old_state_events = None + event.state_events = None + defer.returnValue(False) return + new_state = yield self.resolve_state_groups(event.prev_events) + + event.old_state_events = new_state + + if hasattr(event, "state_key"): + new_state[(event.type, event.state_key)] = event + + event.state_group = None + event.state_events = new_state + + defer.returnValue(hasattr(event, "state_key")) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + # FIXME: HACK! + pdus = yield self.store.get_latest_pdus_in_context(room_id) + + event_ids = [encode_event_id(p.pdu_id, p.origin) for p in pdus] + + res = self.resolve_state_groups(event_ids) + + if event_type: + defer.returnValue(res.get((event_type, state_key))) + return + + defer.returnValue(res.values()) + + @defer.inlineCallbacks + @log_function + def resolve_state_groups(self, event_ids): state_groups = yield self.store.get_state_groups( - event.prev_events + event_ids ) state = {} - state_sets = {} for group in state_groups: for s in group.state: - state.setdefault((s.type, s.state_key), []).append(s) - - state_sets.setdefault( + state.setdefault( (s.type, s.state_key), - set() - ).add(s.event_id) + {} + )[s.event_id] = s unconflicted_state = { - k: state[k].pop() for k, v in state_sets.items() - if len(v) == 1 + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 } conflicted_state = { - k: state[k] - for k, v in state_sets.items() - if len(v) > 1 + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 } - new_state = {} - new_state.update(unconflicted_state) - for key, events in conflicted_state.items(): - new_state[key] = yield self.resolve(events) + try: + new_state = {} + new_state.update(unconflicted_state) + for key, events in conflicted_state.items(): + new_state[key] = yield self._resolve_state_events(events) + except: + logger.exception("Failed to resolve state") + raise - event.old_state_events = new_state - - if hasattr(event, "state_key"): - new_state[(event.type, event.state_key)] = event - - event.state_group = None - event.state_events = new_state + defer.returnValue(new_state) @defer.inlineCallbacks - def resolve(self, events): + @log_function + def _resolve_state_events(self, events): curr_events = events new_powers_deferreds = [] diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index d70467dcd6..b1cb0185a6 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -277,6 +277,12 @@ class PduStore(SQLBaseStore): (context, depth) ) + def get_latest_pdus_in_context(self, context): + return self.runInteraction( + self._get_latest_pdus_in_context, + context + ) + def _get_latest_pdus_in_context(self, txn, context): """Get's a list of the most current pdus for a given context. This is used when we are sending a Pdu and need to fill out the `prev_pdus` diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9496c935a7..0aa979c9f0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -63,6 +63,9 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event): + if not event.state_events: + return + state_group = event.state_group if not state_group: state_group = self._simple_insert_txn( -- cgit 1.5.1 From da1dda3e1d9d3272527d35c23162c4baf7339d74 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Oct 2014 11:18:04 +0000 Subject: Add transaction level logging and timing information. Add a _simple_delete method --- synapse/storage/__init__.py | 3 +- synapse/storage/_base.py | 74 ++++++++++++++++++++++++++++++++--------- synapse/storage/directory.py | 1 + synapse/storage/pdu.py | 13 +++++++- synapse/storage/registration.py | 7 ++-- synapse/storage/room.py | 2 ++ synapse/storage/state.py | 1 + synapse/storage/stream.py | 5 ++- synapse/storage/transactions.py | 6 ++++ 9 files changed, 91 insertions(+), 21 deletions(-) (limited to 'synapse/storage/pdu.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 15a72d0cd7..a50e19349a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -109,6 +109,7 @@ class DataStore(RoomMemberStore, RoomStore, try: yield self.runInteraction( + "persist_event", self._persist_pdu_event_txn, pdu=pdu, event=event, @@ -394,7 +395,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_state_pdu=prev_state_pdu, ) - return self.runInteraction(_snapshot) + return self.runInteraction("snapshot_room", _snapshot) class Snapshot(object): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d3e8741889..1192216971 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -29,15 +29,17 @@ import time logger = logging.getLogger(__name__) sql_logger = logging.getLogger("synapse.storage.SQL") +transaction_logger = logging.getLogger("synapse.storage.txn") class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging to the .execute() method.""" - __slots__ = ["txn"] + __slots__ = ["txn", "name"] - def __init__(self, txn): + def __init__(self, txn, name): object.__setattr__(self, "txn", txn) + object.__setattr__(self, "name", name) def __getattr__(self, name): return getattr(self.txn, name) @@ -47,12 +49,15 @@ class LoggingTransaction(object): def execute(self, sql, *args, **kwargs): # TODO(paul): Maybe use 'info' and 'debug' for values? - sql_logger.debug("[SQL] %s", sql) + sql_logger.debug("[SQL] {%s} %s", self.name, sql) try: if args and args[0]: values = args[0] - sql_logger.debug("[SQL values] " + - ", ".join(("<%s>",) * len(values)), *values) + sql_logger.debug( + "[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)), + self.name, + *values + ) except: # Don't let logging failures stop SQL from working pass @@ -64,10 +69,11 @@ class LoggingTransaction(object): ) finally: end = time.clock() * 1000 - sql_logger.debug("[SQL time] %f", end - start) + sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) class SQLBaseStore(object): + _TXN_ID = 0 def __init__(self, hs): self.hs = hs @@ -75,10 +81,24 @@ class SQLBaseStore(object): self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() - def runInteraction(self, func, *args, **kwargs): + def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" def inner_func(txn, *args, **kwargs): - return func(LoggingTransaction(txn), *args, **kwargs) + start = time.clock() * 1000 + txn_id = str(SQLBaseStore._TXN_ID) + SQLBaseStore._TXN_ID += 1 + + name = "%s-%s" % (desc, txn_id, ) + + transaction_logger.debug("[TXN START] {%s}", name) + try: + return func(LoggingTransaction(txn, name), *args, **kwargs) + finally: + end = time.clock() * 1000 + transaction_logger.debug( + "[TXN END] {%s} %f", + name, end - start + ) return self._db_pool.runInteraction(inner_func, *args, **kwargs) @@ -114,7 +134,7 @@ class SQLBaseStore(object): else: return cursor.fetchall() - return self.runInteraction(interaction) + return self.runInteraction("_execute", interaction) def _execute_and_decode(self, query, *args): return self._execute(self.cursor_to_dict, query, *args) @@ -131,6 +151,7 @@ class SQLBaseStore(object): or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( + "_simple_insert", self._simple_insert_txn, table, values, or_replace=or_replace, or_ignore=or_ignore, ) @@ -168,6 +189,7 @@ class SQLBaseStore(object): statement returns no rows """ return self._simple_selectupdate_one( + "_simple_select_one", table, keyvalues, retcols=retcols, allow_none=allow_none ) @@ -217,7 +239,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return txn.fetchall() - res = yield self.runInteraction(func) + res = yield self.runInteraction("_simple_select_onecol", func) defer.returnValue([r[0] for r in res]) @@ -240,7 +262,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) - return self.runInteraction(func) + return self.runInteraction("_simple_select_list", func) def _simple_update_one(self, table, keyvalues, updatevalues, retcols=None): @@ -308,7 +330,7 @@ class SQLBaseStore(object): raise StoreError(500, "More than one row matched") return ret - return self.runInteraction(func) + return self.runInteraction("_simple_selectupdate_one", func) def _simple_delete_one(self, table, keyvalues): """Executes a DELETE query on the named table, expecting to delete a @@ -320,7 +342,7 @@ class SQLBaseStore(object): """ sql = "DELETE FROM %s WHERE %s" % ( table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) def func(txn): @@ -329,7 +351,25 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction(func) + return self.runInteraction("_simple_delete_one", func) + + def _simple_delete(self, table, keyvalues): + """Executes a DELETE query on the named table. + + Args: + table : string giving the table name + keyvalues : dict of column names and values to select the row with + """ + + return self.runInteraction("_simple_delete", self._simple_delete_txn) + + def _simple_delete_txn(self, txn, table, keyvalues): + sql = "DELETE FROM %s WHERE %s" % ( + table, + " AND ".join("%s = ?" % (k, ) for k in keyvalues) + ) + + return txn.execute(sql, keyvalues.values()) def _simple_max_id(self, table): """Executes a SELECT query on the named table, expecting to return the @@ -347,7 +387,7 @@ class SQLBaseStore(object): return 0 return max_id - return self.runInteraction(func) + return self.runInteraction("_simple_max_id", func) def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items()}) @@ -371,7 +411,9 @@ class SQLBaseStore(object): ) def _parse_events(self, rows): - return self.runInteraction(self._parse_events_txn, rows) + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) def _parse_events_txn(self, txn, rows): events = [self._parse_event_from_row(r) for r in rows] diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 52373a28a6..d6a7113b9c 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -95,6 +95,7 @@ class DirectoryStore(SQLBaseStore): def delete_room_alias(self, room_alias): return self.runInteraction( + "delete_room_alias", self._delete_room_alias_txn, room_alias, ) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 9bdc831fd8..4a4341907b 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -47,7 +47,7 @@ class PduStore(SQLBaseStore): """ return self.runInteraction( - self._get_pdu_tuple, pdu_id, origin + "get_pdu", self._get_pdu_tuple, pdu_id, origin ) def _get_pdu_tuple(self, txn, pdu_id, origin): @@ -108,6 +108,7 @@ class PduStore(SQLBaseStore): """ return self.runInteraction( + "get_current_state_for_context", self._get_current_state_for_context, context ) @@ -156,6 +157,7 @@ class PduStore(SQLBaseStore): """ return self.runInteraction( + "mark_pdu_as_processed", self._mark_as_processed, pdu_id, pdu_origin ) @@ -165,6 +167,7 @@ class PduStore(SQLBaseStore): def get_all_pdus_from_context(self, context): """Get a list of all PDUs for a given context.""" return self.runInteraction( + "get_all_pdus_from_context", self._get_all_pdus_from_context, context, ) @@ -192,6 +195,7 @@ class PduStore(SQLBaseStore): list: A list of PduTuples """ return self.runInteraction( + "get_backfill", self._get_backfill, context, pdu_list, limit ) @@ -253,6 +257,7 @@ class PduStore(SQLBaseStore): context (str) """ return self.runInteraction( + "get_min_depth_for_context", self._get_min_depth_for_context, context ) @@ -291,6 +296,7 @@ class PduStore(SQLBaseStore): def get_latest_pdus_in_context(self, context): return self.runInteraction( + "get_latest_pdus_in_context", self._get_latest_pdus_in_context, context ) @@ -370,6 +376,7 @@ class PduStore(SQLBaseStore): """ return self.runInteraction( + "is_pdu_new", self._is_pdu_new, pdu_id=pdu_id, origin=origin, @@ -523,6 +530,7 @@ class StatePduStore(SQLBaseStore): def get_unresolved_state_tree(self, new_state_pdu): return self.runInteraction( + "get_unresolved_state_tree", self._get_unresolved_state_tree, new_state_pdu ) @@ -562,6 +570,7 @@ class StatePduStore(SQLBaseStore): def update_current_state(self, pdu_id, origin, context, pdu_type, state_key): return self.runInteraction( + "update_current_state", self._update_current_state, pdu_id, origin, context, pdu_type, state_key ) @@ -601,6 +610,7 @@ class StatePduStore(SQLBaseStore): """ return self.runInteraction( + "get_current_state_pdu", self._get_current_state_pdu, context, pdu_type, state_key ) @@ -660,6 +670,7 @@ class StatePduStore(SQLBaseStore): bool: True if the new_pdu clobbered the current state, False if not """ return self.runInteraction( + "handle_new_state", self._handle_new_state, new_pdu ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 719806f82b..a2ca6f9a69 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -62,8 +62,10 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction(self._register, user_id, token, - password_hash) + yield self.runInteraction( + "register", + self._register, user_id, token, password_hash + ) def _register(self, txn, user_id, token, password_hash): now = int(self.clock.time()) @@ -100,6 +102,7 @@ class RegistrationStore(SQLBaseStore): StoreError if no user was found. """ return self.runInteraction( + "get_user_by_token", self._query_for_auth, token ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8cd46334cf..7e48ce9cc3 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -150,6 +150,7 @@ class RoomStore(SQLBaseStore): def get_power_level(self, room_id, user_id): return self.runInteraction( + "get_power_level", self._get_power_level, room_id, user_id, ) @@ -183,6 +184,7 @@ class RoomStore(SQLBaseStore): def get_ops_levels(self, room_id): return self.runInteraction( + "get_ops_levels", self._get_ops_levels, room_id, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0aa979c9f0..e08acd6404 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -59,6 +59,7 @@ class StateStore(SQLBaseStore): def store_state_groups(self, event): return self.runInteraction( + "store_state_groups", self._store_state_groups_txn, event ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d61f909939..8f7f61d29d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -309,7 +309,10 @@ class StreamStore(SQLBaseStore): defer.returnValue(ret) def get_room_events_max_id(self): - return self.runInteraction(self._get_room_events_max_id_txn) + return self.runInteraction( + "get_room_events_max_id", + self._get_room_events_max_id_txn + ) def _get_room_events_max_id_txn(self, txn): txn.execute( diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 2ba8e30efe..908014d38b 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -42,6 +42,7 @@ class TransactionStore(SQLBaseStore): """ return self.runInteraction( + "get_received_txn_response", self._get_received_txn_response, transaction_id, origin ) @@ -73,6 +74,7 @@ class TransactionStore(SQLBaseStore): """ return self.runInteraction( + "set_received_txn_response", self._set_received_txn_response, transaction_id, origin, code, response_dict ) @@ -106,6 +108,7 @@ class TransactionStore(SQLBaseStore): """ return self.runInteraction( + "prep_send_transaction", self._prep_send_transaction, transaction_id, destination, origin_server_ts, pdu_list ) @@ -161,6 +164,7 @@ class TransactionStore(SQLBaseStore): response_json (str) """ return self.runInteraction( + "delivered_txn", self._delivered_txn, transaction_id, destination, code, response_dict ) @@ -186,6 +190,7 @@ class TransactionStore(SQLBaseStore): list: A list of `ReceivedTransactionsTable.EntryType` """ return self.runInteraction( + "get_transactions_after", self._get_transactions_after, transaction_id, destination ) @@ -216,6 +221,7 @@ class TransactionStore(SQLBaseStore): list: A list of PduTuple """ return self.runInteraction( + "get_pdus_after_transaction", self._get_pdus_after_transaction, transaction_id, destination ) -- cgit 1.5.1 From bfa36a72b9a852130cc42fb9322f6596e89725a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Oct 2014 14:00:14 +0000 Subject: Remove PDU tables. --- synapse/federation/persistence.py | 70 --- synapse/federation/replication.py | 2 +- synapse/storage/__init__.py | 60 +-- synapse/storage/pdu.py | 949 -------------------------------------- synapse/storage/schema/pdu.sql | 106 ----- synapse/storage/transactions.py | 45 -- 6 files changed, 2 insertions(+), 1230 deletions(-) delete mode 100644 synapse/storage/pdu.py delete mode 100644 synapse/storage/schema/pdu.sql (limited to 'synapse/storage/pdu.py') diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 7043fcc504..a565375e68 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -32,76 +32,6 @@ import logging logger = logging.getLogger(__name__) -class PduActions(object): - """ Defines persistence actions that relate to handling PDUs. - """ - - def __init__(self, datastore): - self.store = datastore - - @log_function - def mark_as_processed(self, pdu): - """ Persist the fact that we have fully processed the given `Pdu` - - Returns: - Deferred - """ - return self.store.mark_pdu_as_processed(pdu.pdu_id, pdu.origin) - - @defer.inlineCallbacks - @log_function - def after_transaction(self, transaction_id, destination, origin): - """ Returns all `Pdu`s that we sent to the given remote home server - after a given transaction id. - - Returns: - Deferred: Results in a list of `Pdu`s - """ - results = yield self.store.get_pdus_after_transaction( - transaction_id, - destination - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def get_all_pdus_from_context(self, context): - results = yield self.store.get_all_pdus_from_context(context) - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @defer.inlineCallbacks - @log_function - def backfill(self, context, pdu_list, limit): - """ For a given list of PDU id and origins return the proceeding - `limit` `Pdu`s in the given `context`. - - Returns: - Deferred: Results in a list of `Pdu`s. - """ - results = yield self.store.get_backfill( - context, pdu_list, limit - ) - - defer.returnValue([Pdu.from_pdu_tuple(p) for p in results]) - - @log_function - def is_new(self, pdu): - """ When we receive a `Pdu` from a remote home server, we want to - figure out whether it is `new`, i.e. it is not some historic PDU that - we haven't seen simply because we haven't backfilled back that far. - - Returns: - Deferred: Results in a `bool` - """ - return self.store.is_pdu_new( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - depth=pdu.depth - ) - - class TransactionActions(object): """ Defines persistence actions that relate to handling Transactions. """ diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a0bd2e0572..159af4eed7 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -21,7 +21,7 @@ from twisted.internet import defer from .units import Transaction, Pdu, Edu -from .persistence import PduActions, TransactionActions +from .persistence import TransactionActions from synapse.util.logutils import log_function diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d75c366834..3faa571dd9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -37,7 +37,6 @@ from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore from .stream import StreamStore -from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore @@ -60,7 +59,6 @@ logger = logging.getLogger(__name__) SCHEMAS = [ "transactions", - "pdu", "users", "profiles", "presence", @@ -89,7 +87,7 @@ class _RollbackButIsFineException(Exception): class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, - PresenceStore, PduStore, StatePduStore, TransactionStore, + PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, EventFederationStore, ): @@ -150,68 +148,12 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_pdu_event_txn(self, txn, pdu=None, event=None, backfilled=False, stream_ordering=None, is_new_state=True): - if pdu is not None: - self._persist_event_pdu_txn(txn, pdu) if event is not None: return self._persist_event_txn( txn, event, backfilled, stream_ordering, is_new_state=is_new_state, ) - def _persist_event_pdu_txn(self, txn, pdu): - cols = dict(pdu.__dict__) - unrec_keys = dict(pdu.unrecognized_keys) - del cols["hashes"] - del cols["signatures"] - del cols["content"] - del cols["prev_pdus"] - cols["content_json"] = json.dumps(pdu.content) - - unrec_keys.update({ - k: v for k, v in cols.items() - if k not in PdusTable.fields - }) - - cols["unrecognized_keys"] = json.dumps(unrec_keys) - - cols["ts"] = cols.pop("origin_server_ts") - - logger.debug("Persisting: %s", repr(cols)) - - for hash_alg, hash_base64 in pdu.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_pdu_content_hash_txn( - txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes, - ) - - signatures = pdu.signatures.get(pdu.origin, {}) - - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_pdu_origin_signature_txn( - txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes, - ) - - for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_pdu_hash_txn( - txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg, - hash_bytes - ) - - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_pdu_reference_hash_txn( - txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes - ) - - if pdu.is_state: - self._persist_state_txn(txn, pdu.prev_pdus, cols) - else: - self._persist_pdu_txn(txn, pdu.prev_pdus, cols) - - self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) - @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, is_new_state=True): diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py deleted file mode 100644 index 4a4341907b..0000000000 --- a/synapse/storage/pdu.py +++ /dev/null @@ -1,949 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore, Table, JoinHelper - -from synapse.federation.units import Pdu -from synapse.util.logutils import log_function - -from syutil.base64util import encode_base64 - -from collections import namedtuple - -import logging - - -logger = logging.getLogger(__name__) - - -class PduStore(SQLBaseStore): - """A collection of queries for handling PDUs. - """ - - def get_pdu(self, pdu_id, origin): - """Given a pdu_id and origin, get a PDU. - - Args: - txn - pdu_id (str) - origin (str) - - Returns: - PduTuple: If the pdu does not exist in the database, returns None - """ - - return self.runInteraction( - "get_pdu", self._get_pdu_tuple, pdu_id, origin - ) - - def _get_pdu_tuple(self, txn, pdu_id, origin): - res = self._get_pdu_tuples(txn, [(pdu_id, origin)]) - return res[0] if res else None - - def _get_pdu_tuples(self, txn, pdu_id_tuples): - results = [] - for pdu_id, origin in pdu_id_tuples: - txn.execute( - PduEdgesTable.select_statement("pdu_id = ? AND origin = ?"), - (pdu_id, origin) - ) - - edges = [ - (r.prev_pdu_id, r.prev_origin) - for r in PduEdgesTable.decode_results(txn.fetchall()) - ] - - edge_hashes = self._get_prev_pdu_hashes_txn(txn, pdu_id, origin) - - hashes = self._get_pdu_content_hashes_txn(txn, pdu_id, origin) - signatures = self._get_pdu_origin_signatures_txn( - txn, pdu_id, origin - ) - - query = ( - "SELECT %(fields)s FROM %(pdus)s as p " - "LEFT JOIN %(state)s as s " - "ON p.pdu_id = s.pdu_id AND p.origin = s.origin " - "WHERE p.pdu_id = ? AND p.origin = ? " - ) % { - "fields": _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s"), - "pdus": PdusTable.table_name, - "state": StatePdusTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - row = txn.fetchone() - if row: - results.append(PduTuple( - PduEntry(*row), edges, hashes, signatures, edge_hashes - )) - - return results - - def get_current_state_for_context(self, context): - """Get a list of PDUs that represent the current state for a given - context - - Args: - context (str) - - Returns: - list: A list of PduTuples - """ - - return self.runInteraction( - "get_current_state_for_context", - self._get_current_state_for_context, - context - ) - - def _get_current_state_for_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s WHERE context = ?" - % CurrentStateTable.table_name - ) - - logger.debug("get_current_state %s, Args=%s", query, context) - txn.execute(query, (context,)) - - res = txn.fetchall() - - logger.debug("get_current_state %d results", len(res)) - - return self._get_pdu_tuples(txn, res) - - def _persist_pdu_txn(self, txn, prev_pdus, cols): - """Inserts a (non-state) PDU into the database. - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable. - """ - entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - - txn.execute(PdusTable.insert_statement(), entry) - - self._handle_prev_pdus( - txn, entry.outlier, entry.pdu_id, entry.origin, - prev_pdus, entry.context - ) - - def mark_pdu_as_processed(self, pdu_id, pdu_origin): - """Mark a received PDU as processed. - - Args: - txn - pdu_id (str) - pdu_origin (str) - """ - - return self.runInteraction( - "mark_pdu_as_processed", - self._mark_as_processed, pdu_id, pdu_origin - ) - - def _mark_as_processed(self, txn, pdu_id, pdu_origin): - txn.execute("UPDATE %s SET have_processed = 1" % PdusTable.table_name) - - def get_all_pdus_from_context(self, context): - """Get a list of all PDUs for a given context.""" - return self.runInteraction( - "get_all_pdus_from_context", - self._get_all_pdus_from_context, context, - ) - - def _get_all_pdus_from_context(self, txn, context): - query = ( - "SELECT pdu_id, origin FROM %s " - "WHERE context = ?" - ) % PdusTable.table_name - - txn.execute(query, (context,)) - - return self._get_pdu_tuples(txn, txn.fetchall()) - - def get_backfill(self, context, pdu_list, limit): - """Get a list of Pdus for a given topic that occured before (and - including) the pdus in pdu_list. Return a list of max size `limit`. - - Args: - txn - context (str) - pdu_list (list) - limit (int) - - Return: - list: A list of PduTuples - """ - return self.runInteraction( - "get_backfill", - self._get_backfill, context, pdu_list, limit - ) - - def _get_backfill(self, txn, context, pdu_list, limit): - logger.debug( - "backfill: %s, %s, %s", - context, repr(pdu_list), limit - ) - - # We seed the pdu_results with the things from the pdu_list. - pdu_results = pdu_list - - front = pdu_list - - query = ( - "SELECT prev_pdu_id, prev_origin FROM %(edges_table)s " - "WHERE context = ? AND pdu_id = ? AND origin = ? " - "LIMIT ?" - ) % { - "edges_table": PduEdgesTable.table_name, - } - - # We iterate through all pdu_ids in `front` to select their previous - # pdus. These are dumped in `new_front`. We continue until we reach the - # limit *or* new_front is empty (i.e., we've run out of things to - # select - while front and len(pdu_results) < limit: - - new_front = [] - for pdu_id, origin in front: - logger.debug( - "_backfill_interaction: i=%s, o=%s", - pdu_id, origin - ) - - txn.execute( - query, - (context, pdu_id, origin, limit - len(pdu_results)) - ) - - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got i=%s, o=%s", - *row - ) - new_front.append(row) - - front = new_front - pdu_results += new_front - - # We also want to update the `prev_pdus` attributes before returning. - return self._get_pdu_tuples(txn, pdu_results) - - def get_min_depth_for_context(self, context): - """Get the current minimum depth for a context - - Args: - txn - context (str) - """ - return self.runInteraction( - "get_min_depth_for_context", - self._get_min_depth_for_context, context - ) - - def _get_min_depth_for_context(self, txn, context): - return self._get_min_depth_interaction(txn, context) - - def _get_min_depth_interaction(self, txn, context): - txn.execute( - "SELECT min_depth FROM %s WHERE context = ?" - % ContextDepthTable.table_name, - (context,) - ) - - row = txn.fetchone() - - return row[0] if row else None - - def _update_min_depth_for_context_txn(self, txn, context, depth): - """Update the minimum `depth` of the given context, which is the line - on which we stop backfilling backwards. - - Args: - context (str) - depth (int) - """ - min_depth = self._get_min_depth_interaction(txn, context) - - do_insert = depth < min_depth if min_depth else True - - if do_insert: - txn.execute( - "INSERT OR REPLACE INTO %s (context, min_depth) " - "VALUES (?,?)" % ContextDepthTable.table_name, - (context, depth) - ) - - def get_latest_pdus_in_context(self, context): - return self.runInteraction( - "get_latest_pdus_in_context", - self._get_latest_pdus_in_context, - context - ) - - def _get_latest_pdus_in_context(self, txn, context): - """Get's a list of the most current pdus for a given context. This is - used when we are sending a Pdu and need to fill out the `prev_pdus` - key - - Args: - txn - context - """ - query = ( - "SELECT p.pdu_id, p.origin, p.depth FROM %(pdus)s as p " - "INNER JOIN %(forward)s as f ON p.pdu_id = f.pdu_id " - "AND f.origin = p.origin " - "WHERE f.context = ?" - ) % { - "pdus": PdusTable.table_name, - "forward": PduForwardExtremitiesTable.table_name, - } - - logger.debug("get_prev query: %s", query) - - txn.execute( - query, - (context, ) - ) - - results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_pdu_reference_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) - - return results - - @defer.inlineCallbacks - def get_oldest_pdus_in_context(self, context): - """Get a list of Pdus that we haven't backfilled beyond yet (and havent - seen). This list is used when we want to backfill backwards and is the - list we send to the remote server. - - Args: - txn - context (str) - - Returns: - list: A list of PduIdTuple. - """ - results = yield self._execute( - None, - "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" - % {"back": PduBackwardExtremitiesTable.table_name, }, - context - ) - - defer.returnValue([PduIdTuple(i, o) for i, o in results]) - - def is_pdu_new(self, pdu_id, origin, context, depth): - """For a given Pdu, try and figure out if it's 'new', i.e., if it's - not something we got randomly from the past, for example when we - request the current state of the room that will probably return a bunch - of pdus from before we joined. - - Args: - txn - pdu_id (str) - origin (str) - context (str) - depth (int) - - Returns: - bool - """ - - return self.runInteraction( - "is_pdu_new", - self._is_pdu_new, - pdu_id=pdu_id, - origin=origin, - context=context, - depth=depth - ) - - def _is_pdu_new(self, txn, pdu_id, origin, context, depth): - # If depth > min depth in back table, then we classify it as new. - # OR if there is nothing in the back table, then it kinda needs to - # be a new thing. - query = ( - "SELECT min(p.depth) FROM %(edges)s as e " - "INNER JOIN %(back)s as b " - "ON e.prev_pdu_id = b.pdu_id AND e.prev_origin = b.origin " - "INNER JOIN %(pdus)s as p " - "ON e.pdu_id = p.pdu_id AND p.origin = e.origin " - "WHERE p.context = ?" - ) % { - "pdus": PdusTable.table_name, - "edges": PduEdgesTable.table_name, - "back": PduBackwardExtremitiesTable.table_name, - } - - txn.execute(query, (context,)) - - min_depth, = txn.fetchone() - - if not min_depth or depth > int(min_depth): - logger.debug( - "is_new true: id=%s, o=%s, d=%s min_depth=%s", - pdu_id, origin, depth, min_depth - ) - return True - - # If this pdu is in the forwards table, then it also is a new one - query = ( - "SELECT * FROM %(forward)s WHERE pdu_id = ? AND origin = ?" - ) % { - "forward": PduForwardExtremitiesTable.table_name, - } - - txn.execute(query, (pdu_id, origin)) - - # Did we get anything? - if txn.fetchall(): - logger.debug( - "is_new true: id=%s, o=%s, d=%s was forward", - pdu_id, origin, depth - ) - return True - - logger.debug( - "is_new false: id=%s, o=%s, d=%s", - pdu_id, origin, depth - ) - - # FINE THEN. It's probably old. - return False - - @staticmethod - @log_function - def _handle_prev_pdus(txn, outlier, pdu_id, origin, prev_pdus, - context): - txn.executemany( - PduEdgesTable.insert_statement(), - [(pdu_id, origin, p[0], p[1], context) for p in prev_pdus] - ) - - # Update the extremities table if this is not an outlier. - if not outlier: - - # First, we delete the new one from the forwards extremities table. - query = ( - "DELETE FROM %s WHERE pdu_id = ? AND origin = ?" - % PduForwardExtremitiesTable.table_name - ) - txn.executemany(query, list(p[:2] for p in prev_pdus)) - - # We only insert as a forward extremety the new pdu if there are no - # other pdus that reference it as a prev pdu - query = ( - "INSERT INTO %(table)s (pdu_id, origin, context) " - "SELECT ?, ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(pdu_edges)s WHERE " - "prev_pdu_id = ? AND prev_origin = ?" - ")" - ) % { - "table": PduForwardExtremitiesTable.table_name, - "pdu_edges": PduEdgesTable.table_name - } - - logger.debug("query: %s", query) - - txn.execute(query, (pdu_id, origin, context, pdu_id, origin)) - - # Insert all the prev_pdus as a backwards thing, they'll get - # deleted in a second if they're incorrect anyway. - txn.executemany( - PduBackwardExtremitiesTable.insert_statement(), - [(i, o, context) for i, o, _ in prev_pdus] - ) - - # Also delete from the backwards extremities table all ones that - # reference pdus that we have already seen - query = ( - "DELETE FROM %(pdu_back)s WHERE EXISTS (" - "SELECT 1 FROM %(pdus)s AS pdus " - "WHERE " - "%(pdu_back)s.pdu_id = pdus.pdu_id " - "AND %(pdu_back)s.origin = pdus.origin " - "AND not pdus.outlier " - ")" - ) % { - "pdu_back": PduBackwardExtremitiesTable.table_name, - "pdus": PdusTable.table_name, - } - txn.execute(query) - - -class StatePduStore(SQLBaseStore): - """A collection of queries for handling state PDUs. - """ - - def _persist_state_txn(self, txn, prev_pdus, cols): - """Inserts a state PDU into the database - - Args: - txn, - prev_pdus (list) - **cols: The columns to insert into the PdusTable and StatePdusTable - """ - pdu_entry = PdusTable.EntryType( - **{k: cols.get(k, None) for k in PdusTable.fields} - ) - state_entry = StatePdusTable.EntryType( - **{k: cols.get(k, None) for k in StatePdusTable.fields} - ) - - logger.debug("Inserting pdu: %s", repr(pdu_entry)) - logger.debug("Inserting state: %s", repr(state_entry)) - - txn.execute(PdusTable.insert_statement(), pdu_entry) - txn.execute(StatePdusTable.insert_statement(), state_entry) - - self._handle_prev_pdus( - txn, - pdu_entry.outlier, pdu_entry.pdu_id, pdu_entry.origin, prev_pdus, - pdu_entry.context - ) - - def get_unresolved_state_tree(self, new_state_pdu): - return self.runInteraction( - "get_unresolved_state_tree", - self._get_unresolved_state_tree, new_state_pdu - ) - - @log_function - def _get_unresolved_state_tree(self, txn, new_pdu): - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - ReturnType = namedtuple( - "StateReturnType", ["new_branch", "current_branch"] - ) - return_value = ReturnType([new_pdu], []) - - if not current: - logger.debug("get_unresolved_state_tree No current state.") - return (return_value, None) - - return_value.current_branch.append(current) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - - missing_branch = None - for branch, prev_state, state in enum_branches: - if state: - return_value[branch].append(state) - else: - # We don't have prev_state :( - missing_branch = branch - break - - return (return_value, missing_branch) - - def update_current_state(self, pdu_id, origin, context, pdu_type, - state_key): - return self.runInteraction( - "update_current_state", - self._update_current_state, - pdu_id, origin, context, pdu_type, state_key - ) - - def _update_current_state(self, txn, pdu_id, origin, context, pdu_type, - state_key): - query = ( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - ) % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - } - - query_args = CurrentStateTable.EntryType( - pdu_id=pdu_id, - origin=origin, - context=context, - pdu_type=pdu_type, - state_key=state_key - ) - - txn.execute(query, query_args) - - def get_current_state_pdu(self, context, pdu_type, state_key): - """For a given context, pdu_type, state_key 3-tuple, return what is - currently considered the current state. - - Args: - txn - context (str) - pdu_type (str) - state_key (str) - - Returns: - PduEntry - """ - - return self.runInteraction( - "get_current_state_pdu", - self._get_current_state_pdu, context, pdu_type, state_key - ) - - def _get_current_state_pdu(self, txn, context, pdu_type, state_key): - return self._get_current_interaction(txn, context, pdu_type, state_key) - - def _get_current_interaction(self, txn, context, pdu_type, state_key): - logger.debug( - "_get_current_interaction %s %s %s", - context, pdu_type, state_key - ) - - fields = _pdu_state_joiner.get_fields( - PdusTable="p", StatePdusTable="s") - - current_query = ( - "SELECT %(fields)s FROM %(state)s as s " - "INNER JOIN %(pdus)s as p " - "ON s.pdu_id = p.pdu_id AND s.origin = p.origin " - "INNER JOIN %(curr)s as c " - "ON s.pdu_id = c.pdu_id AND s.origin = c.origin " - "WHERE s.context = ? AND s.pdu_type = ? AND s.state_key = ? " - ) % { - "fields": fields, - "curr": CurrentStateTable.table_name, - "state": StatePdusTable.table_name, - "pdus": PdusTable.table_name, - } - - txn.execute( - current_query, - (context, pdu_type, state_key) - ) - - row = txn.fetchone() - - result = PduEntry(*row) if row else None - - if not result: - logger.debug("_get_current_interaction not found") - else: - logger.debug( - "_get_current_interaction found %s %s", - result.pdu_id, result.origin - ) - - return result - - def handle_new_state(self, new_pdu): - """Actually perform conflict resolution on the new_pdu on the - assumption we have all the pdus required to perform it. - - Args: - new_pdu - - Returns: - bool: True if the new_pdu clobbered the current state, False if not - """ - return self.runInteraction( - "handle_new_state", - self._handle_new_state, new_pdu - ) - - def _handle_new_state(self, txn, new_pdu): - logger.debug( - "handle_new_state %s %s", - new_pdu.pdu_id, new_pdu.origin - ) - - current = self._get_current_interaction( - txn, - new_pdu.context, new_pdu.pdu_type, new_pdu.state_key - ) - - is_current = False - - if (not current or not current.prev_state_id - or not current.prev_state_origin): - # Oh, we don't have any state for this yet. - is_current = True - elif (current.pdu_id == new_pdu.prev_state_id - and current.origin == new_pdu.prev_state_origin): - # Oh! A direct clobber. Just do it. - is_current = True - else: - ## - # Ok, now loop through until we get to a common ancestor. - max_new = int(new_pdu.power_level) - max_current = int(current.power_level) - - enum_branches = self._enumerate_state_branches( - txn, new_pdu, current - ) - for branch, prev_state, state in enum_branches: - if not state: - raise RuntimeError( - "Could not find state_pdu %s %s" % - ( - prev_state.prev_state_id, - prev_state.prev_state_origin - ) - ) - - if branch == 0: - max_new = max(int(state.depth), max_new) - else: - max_current = max(int(state.depth), max_current) - - is_current = max_new > max_current - - if is_current: - logger.debug("handle_new_state make current") - - # Right, this is a new thing, so woo, just insert it. - txn.execute( - "INSERT OR REPLACE INTO %(curr)s (%(fields)s) VALUES (%(qs)s)" - % { - "curr": CurrentStateTable.table_name, - "fields": CurrentStateTable.get_fields_string(), - "qs": ", ".join(["?"] * len(CurrentStateTable.fields)) - }, - CurrentStateTable.EntryType( - *(new_pdu.__dict__[k] for k in CurrentStateTable.fields) - ) - ) - else: - logger.debug("handle_new_state not current") - - logger.debug("handle_new_state done") - - return is_current - - @log_function - def _enumerate_state_branches(self, txn, pdu_a, pdu_b): - branch_a = pdu_a - branch_b = pdu_b - - while True: - if (branch_a.pdu_id == branch_b.pdu_id - and branch_a.origin == branch_b.origin): - # Woo! We found a common ancestor - logger.debug("_enumerate_state_branches Found common ancestor") - break - - do_branch_a = ( - hasattr(branch_a, "prev_state_id") and - branch_a.prev_state_id - ) - - do_branch_b = ( - hasattr(branch_b, "prev_state_id") and - branch_b.prev_state_id - ) - - logger.debug( - "do_branch_a=%s, do_branch_b=%s", - do_branch_a, do_branch_b - ) - - if do_branch_a and do_branch_b: - do_branch_a = int(branch_a.depth) > int(branch_b.depth) - - if do_branch_a: - pdu_tuple = PduIdTuple( - branch_a.prev_state_id, - branch_a.prev_state_origin - ) - - prev_branch = branch_a - - logger.debug("getting branch_a prev %s", pdu_tuple) - branch_a = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_a: - branch_a = Pdu.from_pdu_tuple(branch_a) - - logger.debug("branch_a=%s", branch_a) - - yield (0, prev_branch, branch_a) - - if not branch_a: - break - elif do_branch_b: - pdu_tuple = PduIdTuple( - branch_b.prev_state_id, - branch_b.prev_state_origin - ) - - prev_branch = branch_b - - logger.debug("getting branch_b prev %s", pdu_tuple) - branch_b = self._get_pdu_tuple(txn, *pdu_tuple) - if branch_b: - branch_b = Pdu.from_pdu_tuple(branch_b) - - logger.debug("branch_b=%s", branch_b) - - yield (1, prev_branch, branch_b) - - if not branch_b: - break - else: - break - - -class PdusTable(Table): - table_name = "pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "ts", - "depth", - "is_state", - "content_json", - "unrecognized_keys", - "outlier", - "have_processed", - ] - - EntryType = namedtuple("PdusEntry", fields) - - -class PduDestinationsTable(Table): - table_name = "pdu_destinations" - - fields = [ - "pdu_id", - "origin", - "destination", - "delivered_ts", - ] - - EntryType = namedtuple("PduDestinationsEntry", fields) - - -class PduEdgesTable(Table): - table_name = "pdu_edges" - - fields = [ - "pdu_id", - "origin", - "prev_pdu_id", - "prev_origin", - "context" - ] - - EntryType = namedtuple("PduEdgesEntry", fields) - - -class PduForwardExtremitiesTable(Table): - table_name = "pdu_forward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduForwardExtremitiesEntry", fields) - - -class PduBackwardExtremitiesTable(Table): - table_name = "pdu_backward_extremities" - - fields = [ - "pdu_id", - "origin", - "context", - ] - - EntryType = namedtuple("PduBackwardExtremitiesEntry", fields) - - -class ContextDepthTable(Table): - table_name = "context_depth" - - fields = [ - "context", - "min_depth", - ] - - EntryType = namedtuple("ContextDepthEntry", fields) - - -class StatePdusTable(Table): - table_name = "state_pdus" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - "power_level", - "prev_state_id", - "prev_state_origin", - ] - - EntryType = namedtuple("StatePdusEntry", fields) - - -class CurrentStateTable(Table): - table_name = "current_state" - - fields = [ - "pdu_id", - "origin", - "context", - "pdu_type", - "state_key", - ] - - EntryType = namedtuple("CurrentStateEntry", fields) - -_pdu_state_joiner = JoinHelper(PdusTable, StatePdusTable) - - -# TODO: These should probably be put somewhere more sensible -PduIdTuple = namedtuple("PduIdTuple", ("pdu_id", "origin")) - -PduEntry = _pdu_state_joiner.EntryType -""" We are always interested in the join of the PdusTable and StatePdusTable, -rather than just the PdusTable. - -This does not include a prev_pdus key. -""" - -PduTuple = namedtuple( - "PduTuple", - ("pdu_entry", "prev_pdu_list", "hashes", "signatures", "edge_hashes") -) -""" This is a tuple of a `PduEntry` and a list of `PduIdTuple` that represent -the `prev_pdus` key of a PDU. -""" diff --git a/synapse/storage/schema/pdu.sql b/synapse/storage/schema/pdu.sql deleted file mode 100644 index 16e111a56c..0000000000 --- a/synapse/storage/schema/pdu.sql +++ /dev/null @@ -1,106 +0,0 @@ -/* Copyright 2014 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ --- Stores pdus and their content -CREATE TABLE IF NOT EXISTS pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - ts INTEGER, - depth INTEGER DEFAULT 0 NOT NULL, - is_state BOOL, - content_json TEXT, - unrecognized_keys TEXT, - outlier BOOL NOT NULL, - have_processed BOOL, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) -); - --- Stores what the current state pdu is for a given (context, pdu_type, key) tuple -CREATE TABLE IF NOT EXISTS state_pdus( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - power_level TEXT, - prev_state_id TEXT, - prev_state_origin TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT prev_pdu_id_origin UNIQUE (prev_state_id, prev_state_origin) -); - -CREATE TABLE IF NOT EXISTS current_state( - pdu_id TEXT, - origin TEXT, - context TEXT, - pdu_type TEXT, - state_key TEXT, - CONSTRAINT pdu_id_origin UNIQUE (pdu_id, origin) - CONSTRAINT uniqueness UNIQUE (context, pdu_type, state_key) ON CONFLICT REPLACE -); - --- Stores where each pdu we want to send should be sent and the delivery status. -create TABLE IF NOT EXISTS pdu_destinations( - pdu_id TEXT, - origin TEXT, - destination TEXT, - delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, destination) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_forward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_backward_extremities( - pdu_id TEXT, - origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, context) ON CONFLICT REPLACE -); - -CREATE TABLE IF NOT EXISTS pdu_edges( - pdu_id TEXT, - origin TEXT, - prev_pdu_id TEXT, - prev_origin TEXT, - context TEXT, - CONSTRAINT uniqueness UNIQUE (pdu_id, origin, prev_pdu_id, prev_origin, context) -); - -CREATE TABLE IF NOT EXISTS context_depth( - context TEXT, - min_depth INTEGER, - CONSTRAINT uniqueness UNIQUE (context) -); - -CREATE INDEX IF NOT EXISTS context_depth_context ON context_depth(context); - - -CREATE INDEX IF NOT EXISTS pdu_id ON pdus(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS dests_id ON pdu_destinations (pdu_id, origin); --- CREATE INDEX IF NOT EXISTS dests ON pdu_destinations (destination); - -CREATE INDEX IF NOT EXISTS pdu_extrem_context ON pdu_forward_extremities(context); -CREATE INDEX IF NOT EXISTS pdu_extrem_id ON pdu_forward_extremities(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_edges_id ON pdu_edges(pdu_id, origin); - -CREATE INDEX IF NOT EXISTS pdu_b_extrem_context ON pdu_backward_extremities(context); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6624348fd0..ea67900788 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore, Table -from .pdu import PdusTable from collections import namedtuple @@ -207,50 +206,6 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) - def get_pdus_after_transaction(self, transaction_id, destination): - """For a given local transaction_id that we sent to a given destination - home server, return a list of PDUs that were sent to that destination - after it. - - Args: - txn - transaction_id (str) - destination (str) - - Returns - list: A list of PduTuple - """ - return self.runInteraction( - "get_pdus_after_transaction", - self._get_pdus_after_transaction, - transaction_id, destination - ) - - def _get_pdus_after_transaction(self, txn, transaction_id, destination): - - # Query that first get's all transaction_ids with an id greater than - # the one given from the `sent_transactions` table. Then JOIN on this - # from the `tx->pdu` table to get a list of (pdu_id, origin) that - # specify the pdus that were sent in those transactions. - query = ( - "SELECT pdu_id, pdu_origin FROM %(tx_pdu)s as tp " - "INNER JOIN %(sent_tx)s as st " - "ON tp.transaction_id = st.transaction_id " - "AND tp.destination = st.destination " - "WHERE st.id > (" - "SELECT id FROM %(sent_tx)s " - "WHERE transaction_id = ? AND destination = ?" - ) % { - "tx_pdu": TransactionsToPduTable.table_name, - "sent_tx": SentTransactions.table_name, - } - - txn.execute(query, (transaction_id, destination)) - - pdus = PdusTable.decode_results(txn.fetchall()) - - return self._get_pdu_tuples(txn, pdus) - class ReceivedTransactionsTable(Table): table_name = "received_transactions" -- cgit 1.5.1