diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py
index 580e591aca..68243d31d0 100644
--- a/synapse/federation/handler.py
+++ b/synapse/federation/handler.py
@@ -63,7 +63,7 @@ class FederationEventHandler(object):
Deferred: Resolved when it has successfully been queued for
processing.
"""
- yield self._fill_out_prev_events(event)
+ yield self.fill_out_prev_events(event)
pdu = self.pdu_codec.pdu_from_event(event)
@@ -129,7 +129,7 @@ class FederationEventHandler(object):
yield self.event_handler.on_receive(new_state_event)
@defer.inlineCallbacks
- def _fill_out_prev_events(self, event):
+ def fill_out_prev_events(self, event):
if hasattr(event, "prev_events"):
return
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 841ad8f132..9f78f3f9bd 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, hs):
super(DataStore, self).__init__(hs)
self.event_factory = hs.get_event_factory()
+ self.hs = hs
@defer.inlineCallbacks
- def persist_event(self, event):
+ def persist_event(self, event, backfilled=False):
if event.type == RoomMemberEvent.TYPE:
yield self._store_room_member(event)
elif event.type == FeedbackEvent.TYPE:
@@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore,
elif event.type == RoomTopicEvent.TYPE:
yield self._store_room_topic(event)
- ret = yield self._store_event(event)
+ ret = yield self._store_event(event, backfilled)
defer.returnValue(ret)
@defer.inlineCallbacks
@@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore,
defer.returnValue(event)
@defer.inlineCallbacks
- def _store_event(self, event):
+ def _store_event(self, event, backfilled):
+ # FIXME (erikj): This should be removed when we start amalgamating
+ # event and pdu storage.
+ yield self.hs.get_federation().fill_out_prev_events(event)
+
vals = {
+ "topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": json.dumps(event.content),
+ "processed": True,
}
+ if backfilled:
+ vals["token_ordering"] = "-1"
+
unrec = {
k: v
for k, v in event.get_full_dict().items()
@@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore,
yield self._simple_insert("events", vals)
- if hasattr(event, "state_key"):
+ if not backfilled and hasattr(event, "state_key"):
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 2452890ea4..b0240e39af 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -14,12 +14,15 @@
*/
CREATE TABLE IF NOT EXISTS events(
- ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+ token_ordering INTEGER AUTOINCREMENT,
+ topological_ordering INTEGER NOT NULL,
event_id TEXT NOT NULL,
type TEXT NOT NULL,
- room_id TEXT,
- content TEXT,
- unrecognized_keys TEXT
+ room_id TEXT NOT NULL,
+ content TEXT NOT NULL,
+ unrecognized_keys TEXT,
+ processed BOOL NOT NULL,
+ CONSTRAINT ev_uniq UNIQUE (event_id)
);
CREATE TABLE IF NOT EXISTS state_events(
@@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events(
room_id TEXT NOT NULL,
type TEXT NOT NULL,
state_key TEXT NOT NULL,
- CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+ CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS room_memberships(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf4b1682b6..f7968f576f 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore):
# Constraints and ordering depend on direction.
if from_key < to_key:
sql += (
- "AND e.ordering > ? AND e.ordering < ? "
- "ORDER BY ordering ASC LIMIT %(limit)d "
+ "AND e.token_ordering > ? AND e.token_ordering < ? "
+ "ORDER BY token_ordering, rowid ASC LIMIT %(limit)d "
) % {"limit": limit}
else:
sql += (
- "AND e.ordering < ? AND e.ordering > ? "
- "ORDER BY ordering DESC LIMIT %(limit)d "
+ "AND e.token_ordering < ? "
+ "AND e.token_ordering > ? "
+ "ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d "
) % {"limit": int(limit)}
rows = yield self._execute_and_decode(
@@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore):
if rows:
if from_key < to_key:
- key = max([r["ordering"] for r in rows])
+ key = max([r["token_ordering"] for r in rows])
else:
- key = min([r["ordering"] for r in rows])
+ key = min([r["token_ordering"] for r in rows])
else:
key = to_key
@@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT * FROM events WHERE room_id = ? "
- "ORDER BY ordering DESC LIMIT ? "
+ "ORDER BY token_ordering, rowid DESC LIMIT ? "
)
rows = yield self._execute_and_decode(
@@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_max_id(self):
res = yield self._execute_and_decode(
- "SELECT MAX(ordering) as m FROM events"
+ "SELECT MAX(token_ordering) as m FROM events"
)
if not res:
|