diff options
author | Erik Johnston <erik@matrix.org> | 2014-10-29 16:59:24 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-10-29 16:59:24 +0000 |
commit | e7858b6d7ef37849a3d2d5004743cdd21ec330a8 (patch) | |
tree | be917c0b61075a287f169ec5210a0b2dab563603 | |
parent | Don't reference PDU when persisting event (diff) | |
download | synapse-e7858b6d7ef37849a3d2d5004743cdd21ec330a8.tar.xz |
Start filling out and using new events tables
-rw-r--r-- | synapse/federation/pdu_codec.py | 12 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 90 | ||||
-rw-r--r-- | synapse/state.py | 11 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 45 | ||||
-rw-r--r-- | synapse/storage/_base.py | 33 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 49 | ||||
-rw-r--r-- | synapse/storage/schema/event_edges.sql | 8 |
8 files changed, 159 insertions, 93 deletions
diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py index 2cd591410b..dccbccb85b 100644 --- a/synapse/federation/pdu_codec.py +++ b/synapse/federation/pdu_codec.py @@ -48,8 +48,8 @@ class PduCodec(object): kwargs["room_id"] = pdu.context kwargs["etype"] = pdu.pdu_type kwargs["prev_events"] = [ - encode_event_id(i, o) - for i, o in pdu.prev_pdus + (encode_event_id(i, o), s) + for i, o, s in pdu.prev_pdus ] if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"): @@ -82,7 +82,13 @@ class PduCodec(object): d["pdu_type"] = event.type if hasattr(event, "prev_events"): - d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events] + def f(e, s): + i, o = decode_event_id(e, self.server_name) + return i, o, s + d["prev_pdus"] = [ + f(e, s) + for e, s in event.prev_events + ] if hasattr(event, "prev_state"): d["prev_state_id"], d["prev_state_origin"] = ( diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index cd6c35f194..787a01efc5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -16,6 +16,8 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError +from synapse.util.async import run_on_reactor + class BaseHandler(object): def __init__(self, hs): @@ -45,6 +47,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _on_new_room_event(self, event, snapshot, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b575986fc3..5f86ed03fa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.federation.pdu_codec import PduCodec, encode_event_id from synapse.api.errors import SynapseError +from synapse.util.async import run_on_reactor from twisted.internet import defer, reactor @@ -81,6 +82,8 @@ class FederationHandler(BaseHandler): processing. """ + yield run_on_reactor() + pdu = self.pdu_codec.pdu_from_event(event) if not hasattr(pdu, "destinations") or not pdu.destinations: @@ -102,6 +105,8 @@ class FederationHandler(BaseHandler): self.room_queues[event.room_id].append(pdu) return + logger.debug("Processing event: %s", event.event_id) + if state: state = [self.pdu_codec.event_from_pdu(p) for p in state] @@ -216,58 +221,65 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - self.room_queues[room_id] = [] - - event.event_id = self.event_factory.create_event_id() - event.content = content + event.outlier = False - state = yield self.replication_layer.send_join( - target_host, - self.pdu_codec.pdu_from_event(event) - ) + self.room_queues[room_id] = [] - state = [self.pdu_codec.event_from_pdu(p) for p in state] + try: + event.event_id = self.event_factory.create_event_id() + event.content = content - logger.debug("do_invite_join state: %s", state) + state = yield self.replication_layer.send_join( + target_host, + self.pdu_codec.pdu_from_event(event) + ) - is_new_state = yield self.state_handler.annotate_state_groups( - event, - state=state - ) + state = [self.pdu_codec.event_from_pdu(p) for p in state] - try: - yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False - ) - except: - # FIXME - pass + logger.debug("do_invite_join state: %s", state) - for e in state: - # FIXME: Auth these. is_new_state = yield self.state_handler.annotate_state_groups( - e, + event, + state=state ) + logger.debug("do_invite_join event: %s", event) + + try: + yield self.store.store_room( + room_id=room_id, + room_creator_user_id="", + is_public=False + ) + except: + # FIXME + pass + + for e in state: + # FIXME: Auth these. + e.outlier = True + + yield self.state_handler.annotate_state_groups( + e, + ) + + yield self.store.persist_event( + e, + backfilled=False, + is_new_state=False + ) + yield self.store.persist_event( - e, + event, backfilled=False, - is_new_state=False + is_new_state=is_new_state ) + finally: + room_queue = self.room_queues[room_id] + del self.room_queues[room_id] - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) - - room_queue = self.room_queues[room_id] - del self.room_queues[room_id] - - for p in room_queue: - yield self.on_receive_pdu(p, backfilled=False) + for p in room_queue: + yield self.on_receive_pdu(p, backfilled=False) defer.returnValue(True) diff --git a/synapse/state.py b/synapse/state.py index cc6a7db96b..993c4f18d3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -143,7 +143,9 @@ class StateHandler(object): defer.returnValue(False) return - new_state = yield self.resolve_state_groups(event.prev_events) + new_state = yield self.resolve_state_groups( + [e for e, _ in event.prev_events] + ) event.old_state_events = copy.deepcopy(new_state) @@ -157,12 +159,11 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - # FIXME: HACK! - pdus = yield self.store.get_latest_pdus_in_context(room_id) + events = yield self.store.get_latest_events_in_room(room_id) event_ids = [ - encode_event_id(pdu_id, origin) - for pdu_id, origin, _ in pdus + e_id + for e_id, _ in events ] res = yield self.resolve_state_groups(event_ids) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f89e518690..d75c366834 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -71,6 +71,7 @@ SCHEMAS = [ "state", "signatures", "event_edges", + "event_signatures", ] @@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore, "type", "room_id", "content", - "unrecognized_keys" + "unrecognized_keys", + "depth", ], allow_none=allow_none, ) @@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore, vals["unrecognized_keys"] = json.dumps(unrec) try: - self._simple_insert_txn(txn, "events", vals) + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + ) except: logger.warn( "Failed to persist, probably duplicate: %s", @@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore, } ) - signatures = event.signatures.get(event.origin, {}) + if hasattr(event, "signatures"): + signatures = event.signatures.get(event.origin, {}) - for key_id, signature_base64 in signatures.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_origin_signature_txn( - txn, event.event_id, key_id, signature_bytes, - ) + for key_id, signature_base64 in signatures.items(): + signature_bytes = decode_base64(signature_base64) + self._store_event_origin_signature_txn( + txn, event.event_id, event.origin, key_id, signature_bytes, + ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): @@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore, ) # TODO - (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) + # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu) + # self._store_event_reference_hash_txn( + # txn, event.event_id, ref_alg, ref_hash_bytes + # ) self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) @@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore, """ def _snapshot(txn): membership_state = self._get_room_member(txn, user_id, room_id) - prev_events = self._get_latest_events_in_room( - txn, room_id - ) + prev_events = self._get_latest_events_in_room(txn, room_id) if state_type is not None and state_key is not None: prev_state_pdu = self._get_current_state_pdu( @@ -469,12 +475,12 @@ class Snapshot(object): return event.prev_events = [ - (p_id, origin, hashes) - for p_id, origin, hashes, _ in self.prev_events + (event_id, hashes) + for event_id, hashes, _ in self.prev_events ] if self.prev_events: - event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1 + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 else: event.depth = 0 @@ -533,9 +539,10 @@ def prepare_database(db_conn): db_conn.commit() else: - sql_script = "BEGIN TRANSACTION;" + sql_script = "BEGIN TRANSACTION;\n" for sql_loc in SCHEMAS: sql_script += read_schema(sql_loc) + sql_script += "\n" sql_script += "COMMIT TRANSACTION;" c.executescript(sql_script) db_conn.commit() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 30732caa83..464b12f032 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,10 +19,12 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function +from syutil.base64util import encode_base64 import collections import copy import json +import sys import time @@ -67,6 +69,9 @@ class LoggingTransaction(object): return self.txn.execute( sql, *args, **kwargs ) + except: + logger.exception("[SQL FAIL] {%s}", self.name) + raise finally: end = time.clock() * 1000 sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) @@ -85,14 +90,20 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" def inner_func(txn, *args, **kwargs): start = time.clock() * 1000 - txn_id = str(SQLBaseStore._TXN_ID) - SQLBaseStore._TXN_ID += 1 + txn_id = SQLBaseStore._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) - name = "%s-%s" % (desc, txn_id, ) + name = "%s-%x" % (desc, txn_id, ) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) + except: + logger.exception("[TXN FAIL] {%s}", name) + raise finally: end = time.clock() * 1000 transaction_logger.debug( @@ -189,7 +200,6 @@ class SQLBaseStore(object): statement returns no rows """ return self._simple_selectupdate_one( - "_simple_select_one", table, keyvalues, retcols=retcols, allow_none=allow_none ) @@ -215,11 +225,11 @@ class SQLBaseStore(object): txn, table=table, keyvalues=keyvalues, - retcols=retcol, + retcol=retcol, ) if ret: - return ret[retcol] + return ret[0] else: if allow_none: return None @@ -434,6 +444,17 @@ class SQLBaseStore(object): sql = "SELECT * FROM events WHERE event_id = ?" for ev in events: + signatures = self._get_event_origin_signatures_txn( + txn, ev.event_id, + ) + + ev.signatures = { + k: encode_base64(v) for k, v in signatures.items() + } + + prev_events = self._get_latest_events_in_room(txn, ev.room_id) + ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events] + if hasattr(ev, "prev_state"): # Load previous state_content. # TODO: Should we be pulling this out above? diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 7688fc550f..5f94c31818 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,6 +24,13 @@ logger = logging.getLogger(__name__) class EventFederationStore(SQLBaseStore): + def get_latest_events_in_room(self, room_id): + return self.runInteraction( + "get_latest_events_in_room", + self._get_latest_events_in_room, + room_id, + ) + def _get_latest_events_in_room(self, txn, room_id): self._simple_select_onecol_txn( txn, @@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore): retcol="event_id", ) + sql = ( + "SELECT e.event_id, e.depth FROM events as e " + "INNER JOIN event_forward_extremities as f " + "ON e.event_id = f.event_id " + "WHERE f.room_id = ?" + ) + + txn.execute(sql, (room_id, )) + results = [] - for pdu_id, origin, depth in txn.fetchall(): - hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin) - sha256_bytes = hashes["sha256"] - prev_hashes = {"sha256": encode_base64(sha256_bytes)} - results.append((pdu_id, origin, prev_hashes, depth)) + for event_id, depth in txn.fetchall(): + hashes = self._get_prev_event_hashes_txn(txn, event_id) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((event_id, prev_hashes, depth)) + + return results def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( @@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore): def _handle_prev_events(self, txn, outlier, event_id, prev_events, room_id): - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, table="event_edges", values={ "event_id": event_id, - "prev_event": e_id, + "prev_event_id": e_id, "room_id": room_id, } ) # Update the extremities table if this is not an outlier. if not outlier: - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_delete_txn( txn, @@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_pdus as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id in prev_events: + for e_id, _ in prev_events: # TODO (erikj): This could be done as a bulk insert self._simple_insert_txn( txn, @@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore): # Also delete from the backwards extremities table all ones that # reference pdus that we have already seen query = ( - "DELETE FROM %(event_back)s as b WHERE EXISTS (" - "SELECT 1 FROM %(events)s AS events " + "DELETE FROM event_backward_extremities WHERE EXISTS (" + "SELECT 1 FROM events " "WHERE " - "b.event_id = events.event_id " + "event_backward_extremities.event_id = events.event_id " "AND not events.outlier " ")" - ) % { - "event_back": "event_backward_extremities", - "events": "events", - } + ) txn.execute(query) \ No newline at end of file diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index 6a28314ece..e5f768c705 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id TEXT, @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); --- + CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT, @@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges( CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); --- CREATE TABLE IF NOT EXISTS room_depth( @@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth( ); CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); --- + create TABLE IF NOT EXISTS event_destinations( event_id TEXT, @@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations( ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); --- \ No newline at end of file |