diff --git a/synapse/federation/pdu_codec.py b/synapse/federation/pdu_codec.py
index 2cd591410b..dccbccb85b 100644
--- a/synapse/federation/pdu_codec.py
+++ b/synapse/federation/pdu_codec.py
@@ -48,8 +48,8 @@ class PduCodec(object):
kwargs["room_id"] = pdu.context
kwargs["etype"] = pdu.pdu_type
kwargs["prev_events"] = [
- encode_event_id(i, o)
- for i, o in pdu.prev_pdus
+ (encode_event_id(i, o), s)
+ for i, o, s in pdu.prev_pdus
]
if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"):
@@ -82,7 +82,13 @@ class PduCodec(object):
d["pdu_type"] = event.type
if hasattr(event, "prev_events"):
- d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events]
+ def f(e, s):
+ i, o = decode_event_id(e, self.server_name)
+ return i, o, s
+ d["prev_pdus"] = [
+ f(e, s)
+ for e, s in event.prev_events
+ ]
if hasattr(event, "prev_state"):
d["prev_state_id"], d["prev_state_origin"] = (
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index cd6c35f194..787a01efc5 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,6 +16,8 @@
from twisted.internet import defer
from synapse.api.errors import LimitExceededError
+from synapse.util.async import run_on_reactor
+
class BaseHandler(object):
def __init__(self, hs):
@@ -45,6 +47,8 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _on_new_room_event(self, event, snapshot, extra_destinations=[],
extra_users=[], suppress_auth=False):
+ yield run_on_reactor()
+
snapshot.fill_out_prev_events(event)
yield self.state_handler.annotate_state_groups(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b575986fc3..5f86ed03fa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.federation.pdu_codec import PduCodec, encode_event_id
from synapse.api.errors import SynapseError
+from synapse.util.async import run_on_reactor
from twisted.internet import defer, reactor
@@ -81,6 +82,8 @@ class FederationHandler(BaseHandler):
processing.
"""
+ yield run_on_reactor()
+
pdu = self.pdu_codec.pdu_from_event(event)
if not hasattr(pdu, "destinations") or not pdu.destinations:
@@ -102,6 +105,8 @@ class FederationHandler(BaseHandler):
self.room_queues[event.room_id].append(pdu)
return
+ logger.debug("Processing event: %s", event.event_id)
+
if state:
state = [self.pdu_codec.event_from_pdu(p) for p in state]
@@ -216,58 +221,65 @@ class FederationHandler(BaseHandler):
assert(event.state_key == joinee)
assert(event.room_id == room_id)
- self.room_queues[room_id] = []
-
- event.event_id = self.event_factory.create_event_id()
- event.content = content
+ event.outlier = False
- state = yield self.replication_layer.send_join(
- target_host,
- self.pdu_codec.pdu_from_event(event)
- )
+ self.room_queues[room_id] = []
- state = [self.pdu_codec.event_from_pdu(p) for p in state]
+ try:
+ event.event_id = self.event_factory.create_event_id()
+ event.content = content
- logger.debug("do_invite_join state: %s", state)
+ state = yield self.replication_layer.send_join(
+ target_host,
+ self.pdu_codec.pdu_from_event(event)
+ )
- is_new_state = yield self.state_handler.annotate_state_groups(
- event,
- state=state
- )
+ state = [self.pdu_codec.event_from_pdu(p) for p in state]
- try:
- yield self.store.store_room(
- room_id=room_id,
- room_creator_user_id="",
- is_public=False
- )
- except:
- # FIXME
- pass
+ logger.debug("do_invite_join state: %s", state)
- for e in state:
- # FIXME: Auth these.
is_new_state = yield self.state_handler.annotate_state_groups(
- e,
+ event,
+ state=state
)
+ logger.debug("do_invite_join event: %s", event)
+
+ try:
+ yield self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id="",
+ is_public=False
+ )
+ except:
+ # FIXME
+ pass
+
+ for e in state:
+ # FIXME: Auth these.
+ e.outlier = True
+
+ yield self.state_handler.annotate_state_groups(
+ e,
+ )
+
+ yield self.store.persist_event(
+ e,
+ backfilled=False,
+ is_new_state=False
+ )
+
yield self.store.persist_event(
- e,
+ event,
backfilled=False,
- is_new_state=False
+ is_new_state=is_new_state
)
+ finally:
+ room_queue = self.room_queues[room_id]
+ del self.room_queues[room_id]
- yield self.store.persist_event(
- event,
- backfilled=False,
- is_new_state=is_new_state
- )
-
- room_queue = self.room_queues[room_id]
- del self.room_queues[room_id]
-
- for p in room_queue:
- yield self.on_receive_pdu(p, backfilled=False)
+ for p in room_queue:
+ yield self.on_receive_pdu(p, backfilled=False)
defer.returnValue(True)
diff --git a/synapse/state.py b/synapse/state.py
index cc6a7db96b..993c4f18d3 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -143,7 +143,9 @@ class StateHandler(object):
defer.returnValue(False)
return
- new_state = yield self.resolve_state_groups(event.prev_events)
+ new_state = yield self.resolve_state_groups(
+ [e for e, _ in event.prev_events]
+ )
event.old_state_events = copy.deepcopy(new_state)
@@ -157,12 +159,11 @@ class StateHandler(object):
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
- # FIXME: HACK!
- pdus = yield self.store.get_latest_pdus_in_context(room_id)
+ events = yield self.store.get_latest_events_in_room(room_id)
event_ids = [
- encode_event_id(pdu_id, origin)
- for pdu_id, origin, _ in pdus
+ e_id
+ for e_id, _ in events
]
res = yield self.resolve_state_groups(event_ids)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f89e518690..d75c366834 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -71,6 +71,7 @@ SCHEMAS = [
"state",
"signatures",
"event_edges",
+ "event_signatures",
]
@@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore,
"type",
"room_id",
"content",
- "unrecognized_keys"
+ "unrecognized_keys",
+ "depth",
],
allow_none=allow_none,
)
@@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore,
vals["unrecognized_keys"] = json.dumps(unrec)
try:
- self._simple_insert_txn(txn, "events", vals)
+ self._simple_insert_txn(
+ txn,
+ "events",
+ vals,
+ or_replace=(not outlier),
+ )
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
@@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
- signatures = event.signatures.get(event.origin, {})
+ if hasattr(event, "signatures"):
+ signatures = event.signatures.get(event.origin, {})
- for key_id, signature_base64 in signatures.items():
- signature_bytes = decode_base64(signature_base64)
- self._store_event_origin_signature_txn(
- txn, event.event_id, key_id, signature_bytes,
- )
+ for key_id, signature_base64 in signatures.items():
+ signature_bytes = decode_base64(signature_base64)
+ self._store_event_origin_signature_txn(
+ txn, event.event_id, event.origin, key_id, signature_bytes,
+ )
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
@@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
# TODO
- (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
- self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
- )
+ # (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
+ # self._store_event_reference_hash_txn(
+ # txn, event.event_id, ref_alg, ref_hash_bytes
+ # )
self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
@@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore,
"""
def _snapshot(txn):
membership_state = self._get_room_member(txn, user_id, room_id)
- prev_events = self._get_latest_events_in_room(
- txn, room_id
- )
+ prev_events = self._get_latest_events_in_room(txn, room_id)
if state_type is not None and state_key is not None:
prev_state_pdu = self._get_current_state_pdu(
@@ -469,12 +475,12 @@ class Snapshot(object):
return
event.prev_events = [
- (p_id, origin, hashes)
- for p_id, origin, hashes, _ in self.prev_events
+ (event_id, hashes)
+ for event_id, hashes, _ in self.prev_events
]
if self.prev_events:
- event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1
+ event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
else:
event.depth = 0
@@ -533,9 +539,10 @@ def prepare_database(db_conn):
db_conn.commit()
else:
- sql_script = "BEGIN TRANSACTION;"
+ sql_script = "BEGIN TRANSACTION;\n"
for sql_loc in SCHEMAS:
sql_script += read_schema(sql_loc)
+ sql_script += "\n"
sql_script += "COMMIT TRANSACTION;"
c.executescript(sql_script)
db_conn.commit()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 30732caa83..464b12f032 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,10 +19,12 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.api.events.utils import prune_event
from synapse.util.logutils import log_function
+from syutil.base64util import encode_base64
import collections
import copy
import json
+import sys
import time
@@ -67,6 +69,9 @@ class LoggingTransaction(object):
return self.txn.execute(
sql, *args, **kwargs
)
+ except:
+ logger.exception("[SQL FAIL] {%s}", self.name)
+ raise
finally:
end = time.clock() * 1000
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
@@ -85,14 +90,20 @@ class SQLBaseStore(object):
"""Wraps the .runInteraction() method on the underlying db_pool."""
def inner_func(txn, *args, **kwargs):
start = time.clock() * 1000
- txn_id = str(SQLBaseStore._TXN_ID)
- SQLBaseStore._TXN_ID += 1
+ txn_id = SQLBaseStore._TXN_ID
+
+ # We don't really need these to be unique, so lets stop it from
+ # growing really large.
+ self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
- name = "%s-%s" % (desc, txn_id, )
+ name = "%s-%x" % (desc, txn_id, )
transaction_logger.debug("[TXN START] {%s}", name)
try:
return func(LoggingTransaction(txn, name), *args, **kwargs)
+ except:
+ logger.exception("[TXN FAIL] {%s}", name)
+ raise
finally:
end = time.clock() * 1000
transaction_logger.debug(
@@ -189,7 +200,6 @@ class SQLBaseStore(object):
statement returns no rows
"""
return self._simple_selectupdate_one(
- "_simple_select_one",
table, keyvalues, retcols=retcols, allow_none=allow_none
)
@@ -215,11 +225,11 @@ class SQLBaseStore(object):
txn,
table=table,
keyvalues=keyvalues,
- retcols=retcol,
+ retcol=retcol,
)
if ret:
- return ret[retcol]
+ return ret[0]
else:
if allow_none:
return None
@@ -434,6 +444,17 @@ class SQLBaseStore(object):
sql = "SELECT * FROM events WHERE event_id = ?"
for ev in events:
+ signatures = self._get_event_origin_signatures_txn(
+ txn, ev.event_id,
+ )
+
+ ev.signatures = {
+ k: encode_base64(v) for k, v in signatures.items()
+ }
+
+ prev_events = self._get_latest_events_in_room(txn, ev.room_id)
+ ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events]
+
if hasattr(ev, "prev_state"):
# Load previous state_content.
# TODO: Should we be pulling this out above?
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 7688fc550f..5f94c31818 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
class EventFederationStore(SQLBaseStore):
+ def get_latest_events_in_room(self, room_id):
+ return self.runInteraction(
+ "get_latest_events_in_room",
+ self._get_latest_events_in_room,
+ room_id,
+ )
+
def _get_latest_events_in_room(self, txn, room_id):
self._simple_select_onecol_txn(
txn,
@@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore):
retcol="event_id",
)
+ sql = (
+ "SELECT e.event_id, e.depth FROM events as e "
+ "INNER JOIN event_forward_extremities as f "
+ "ON e.event_id = f.event_id "
+ "WHERE f.room_id = ?"
+ )
+
+ txn.execute(sql, (room_id, ))
+
results = []
- for pdu_id, origin, depth in txn.fetchall():
- hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin)
- sha256_bytes = hashes["sha256"]
- prev_hashes = {"sha256": encode_base64(sha256_bytes)}
- results.append((pdu_id, origin, prev_hashes, depth))
+ for event_id, depth in txn.fetchall():
+ hashes = self._get_prev_event_hashes_txn(txn, event_id)
+ prev_hashes = {
+ k: encode_base64(v) for k, v in hashes.items()
+ if k == "sha256"
+ }
+ results.append((event_id, prev_hashes, depth))
+
+ return results
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
@@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore):
def _handle_prev_events(self, txn, outlier, event_id, prev_events,
room_id):
- for e_id in prev_events:
+ for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event_id,
- "prev_event": e_id,
+ "prev_event_id": e_id,
"room_id": room_id,
}
)
# Update the extremities table if this is not an outlier.
if not outlier:
- for e_id in prev_events:
+ for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_delete_txn(
txn,
@@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore):
# Insert all the prev_pdus as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
- for e_id in prev_events:
+ for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
@@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore):
# Also delete from the backwards extremities table all ones that
# reference pdus that we have already seen
query = (
- "DELETE FROM %(event_back)s as b WHERE EXISTS ("
- "SELECT 1 FROM %(events)s AS events "
+ "DELETE FROM event_backward_extremities WHERE EXISTS ("
+ "SELECT 1 FROM events "
"WHERE "
- "b.event_id = events.event_id "
+ "event_backward_extremities.event_id = events.event_id "
"AND not events.outlier "
")"
- ) % {
- "event_back": "event_backward_extremities",
- "events": "events",
- }
+ )
txn.execute(query)
\ No newline at end of file
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql
index 6a28314ece..e5f768c705 100644
--- a/synapse/storage/schema/event_edges.sql
+++ b/synapse/storage/schema/event_edges.sql
@@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities(
CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
---
+
CREATE TABLE IF NOT EXISTS event_backward_extremities(
event_id TEXT,
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities(
CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
---
+
CREATE TABLE IF NOT EXISTS event_edges(
event_id TEXT,
@@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges(
CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
---
CREATE TABLE IF NOT EXISTS room_depth(
@@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth(
);
CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
---
+
create TABLE IF NOT EXISTS event_destinations(
event_id TEXT,
@@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations(
);
CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
---
\ No newline at end of file
|