diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 72290eb5a0..d8f351a675 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,13 +279,14 @@ class DataStore(RoomMemberStore, RoomStore,
)
if hasattr(event, "signatures"):
- signatures = event.signatures.get(event.origin, {})
-
- for key_id, signature_base64 in signatures.items():
- signature_bytes = decode_base64(signature_base64)
- self._store_event_origin_signature_txn(
- txn, event.event_id, event.origin, key_id, signature_bytes,
- )
+ logger.debug("sigs: %s", event.signatures)
+ for name, sigs in event.signatures.items():
+ for key_id, signature_base64 in sigs.items():
+ signature_bytes = decode_base64(signature_base64)
+ self._store_event_signature_txn(
+ txn, event.event_id, name, key_id,
+ signature_bytes,
+ )
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a1ee0318f6..670387b04a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -470,12 +470,15 @@ class SQLBaseStore(object):
select_event_sql = "SELECT * FROM events WHERE event_id = ?"
for i, ev in enumerate(events):
- signatures = self._get_event_origin_signatures_txn(
+ signatures = self._get_event_signatures_txn(
txn, ev.event_id,
)
ev.signatures = {
- k: encode_base64(v) for k, v in signatures.items()
+ n: {
+ k: encode_base64(v) for k, v in s.items()
+ }
+ for n, s in signatures.items()
}
prevs = self._get_prev_events_and_state(txn, ev.event_id)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index a027db3868..6c559f8f63 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,6 +23,14 @@ logger = logging.getLogger(__name__)
class EventFederationStore(SQLBaseStore):
+ """ Responsible for storing and serving up the various graphs associated
+ with an event. Including the main event graph and the auth chains for an
+ event.
+
+ Also has methods for getting the front (latest) and back (oldest) edges
+ of the event graphs. These are used to generate the parents for new events
+ and backfilling from another server respectively.
+ """
def get_auth_chain(self, event_id):
return self.runInteraction(
@@ -205,6 +213,8 @@ class EventFederationStore(SQLBaseStore):
return results
def get_min_depth(self, room_id):
+ """ For hte given room, get the minimum depth we have seen for it.
+ """
return self.runInteraction(
"get_min_depth",
self._get_min_depth_interaction,
@@ -240,6 +250,10 @@ class EventFederationStore(SQLBaseStore):
def _handle_prev_events(self, txn, outlier, event_id, prev_events,
room_id):
+ """
+ For the given event, update the event edges table and forward and
+ backward extremities tables.
+ """
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
@@ -267,8 +281,8 @@ class EventFederationStore(SQLBaseStore):
}
)
- # We only insert as a forward extremity the new pdu if there are
- # no other pdus that reference it as a prev pdu
+ # We only insert as a forward extremity the new event if there are
+ # no other events that reference it as a prev event
query = (
"INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
"SELECT ?, ? WHERE NOT EXISTS ("
@@ -284,7 +298,7 @@ class EventFederationStore(SQLBaseStore):
txn.execute(query, (event_id, room_id, event_id))
- # Insert all the prev_pdus as a backwards thing, they'll get
+ # Insert all the prev_events as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
@@ -299,7 +313,7 @@ class EventFederationStore(SQLBaseStore):
)
# Also delete from the backwards extremities table all ones that
- # reference pdus that we have already seen
+ # reference events that we have already seen
query = (
"DELETE FROM event_backward_extremities WHERE EXISTS ("
"SELECT 1 FROM events "
@@ -311,17 +325,14 @@ class EventFederationStore(SQLBaseStore):
txn.execute(query)
def get_backfill_events(self, room_id, event_list, limit):
- """Get a list of Events for a given topic that occured before (and
- including) the pdus in pdu_list. Return a list of max size `limit`.
+ """Get a list of Events for a given topic that occurred before (and
+ including) the events in event_list. Return a list of max size `limit`
Args:
txn
room_id (str)
event_list (list)
limit (int)
-
- Return:
- list: A list of PduTuples
"""
return self.runInteraction(
"get_backfill_events",
@@ -334,7 +345,6 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- # We seed the pdu_results with the things from the pdu_list.
event_results = event_list
front = event_list
@@ -373,5 +383,4 @@ class EventFederationStore(SQLBaseStore):
front = new_front
event_results += new_front
- # We also want to update the `prev_pdus` attributes before returning.
return self._get_events_txn(txn, event_results)
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index 5491c7ecec..4efa8a3e63 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -37,15 +37,15 @@ CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
);
-CREATE TABLE IF NOT EXISTS event_origin_signatures (
+CREATE TABLE IF NOT EXISTS event_signatures (
event_id TEXT,
- origin TEXT,
+ signature_name TEXT,
key_id TEXT,
signature BLOB,
CONSTRAINT uniqueness UNIQUE (event_id, key_id)
);
-CREATE INDEX IF NOT EXISTS event_origin_signatures_id ON event_origin_signatures (
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
event_id
);
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql
index b44c56b519..44f7aafb27 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/state.sql
@@ -30,4 +30,17 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
CREATE TABLE IF NOT EXISTS event_to_state_groups(
event_id TEXT NOT NULL,
state_group INTEGER NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
+
+CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
+ state_group
+);
+CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
+ room_id, type, state_key
+);
+
+CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
+ event_id
);
\ No newline at end of file
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 84a49088a2..d90e08fff1 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -103,24 +103,30 @@ class SignatureStore(SQLBaseStore):
or_ignore=True,
)
-
- def _get_event_origin_signatures_txn(self, txn, event_id):
+ def _get_event_signatures_txn(self, txn, event_id):
"""Get all the signatures for a given PDU.
Args:
txn (cursor):
event_id (str): Id for the Event.
Returns:
- A dict of key_id -> signature_bytes.
+ A dict of sig name -> dict(key_id -> signature_bytes)
"""
query = (
- "SELECT key_id, signature"
- " FROM event_origin_signatures"
+ "SELECT signature_name, key_id, signature"
+ " FROM event_signatures"
" WHERE event_id = ? "
)
txn.execute(query, (event_id, ))
- return dict(txn.fetchall())
+ rows = txn.fetchall()
+
+ res = {}
+
+ for name, key, sig in rows:
+ res.setdefault(name, {})[key] = sig
+
+ return res
- def _store_event_origin_signature_txn(self, txn, event_id, origin, key_id,
+ def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
signature_bytes):
"""Store a signature from the origin server for a PDU.
Args:
@@ -132,10 +138,10 @@ class SignatureStore(SQLBaseStore):
"""
self._simple_insert_txn(
txn,
- "event_origin_signatures",
+ "event_signatures",
{
"event_id": event_id,
- "origin": origin,
+ "signature_name": signature_name,
"key_id": key_id,
"signature": buffer(signature_bytes),
},
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 2f3a70b4e5..55ea567793 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,43 +14,71 @@
# limitations under the License.
from ._base import SQLBaseStore
-from twisted.internet import defer
class StateStore(SQLBaseStore):
+ """ Keeps track of the state at a given event.
+
+ This is done by the concept of `state groups`. Every event is a assigned
+ a state group (identified by an arbitrary string), which references a
+ collection of state events. The current state of an event is then the
+ collection of state events referenced by the event's state group.
+
+ Hence, every change in the current state causes a new state group to be
+ generated. However, if no change happens (e.g., if we get a message event
+ with only one parent it inherits the state group from its parent.)
+
+ There are three tables:
+ * `state_groups`: Stores group name, first event with in the group and
+ room id.
+ * `event_to_state_groups`: Maps events to state groups.
+ * `state_groups_state`: Maps state group to state events.
+ """
- @defer.inlineCallbacks
def get_state_groups(self, event_ids):
- groups = set()
- for event_id in event_ids:
- group = yield self._simple_select_one_onecol(
- table="event_to_state_groups",
- keyvalues={"event_id": event_id},
- retcol="state_group",
- allow_none=True,
- )
- if group:
- groups.add(group)
-
- res = {}
- for group in groups:
- state_ids = yield self._simple_select_onecol(
- table="state_groups_state",
- keyvalues={"state_group": group},
- retcol="event_id",
- )
- state = []
- for state_id in state_ids:
- s = yield self.get_event(
- state_id,
+ """ Get the state groups for the given list of event_ids
+
+ The return value is a dict mapping group names to lists of events.
+ """
+
+ def f(txn):
+ groups = set()
+ for event_id in event_ids:
+ group = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_to_state_groups",
+ keyvalues={"event_id": event_id},
+ retcol="state_group",
allow_none=True,
)
- if s:
- state.append(s)
+ if group:
+ groups.add(group)
- res[group] = state
+ res = {}
+ for group in groups:
+ state_ids = self._simple_select_onecol_txn(
+ txn,
+ table="state_groups_state",
+ keyvalues={"state_group": group},
+ retcol="event_id",
+ )
+ state = []
+ for state_id in state_ids:
+ s = self._get_events_txn(
+ txn,
+ [state_id],
+ )
+ if s:
+ state.extend(s)
+
+ res[group] = state
- defer.returnValue(res)
+ return res
+
+ return self.runInteraction(
+ "get_state_groups",
+ f,
+ )
def store_state_groups(self, event):
return self.runInteraction(
|