diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 74b4e23590..a1982dfbb5 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -79,6 +79,28 @@ class EventFederationStore(SQLBaseStore):
room_id,
)
+ def get_oldest_events_with_depth_in_room(self, room_id):
+ return self.runInteraction(
+ "get_oldest_events_with_depth_in_room",
+ self.get_oldest_events_with_depth_in_room_txn,
+ room_id,
+ )
+
+ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
+ sql = (
+ "SELECT b.event_id, MAX(e.depth) FROM events as e"
+ " INNER JOIN event_edges as g"
+ " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " INNER JOIN event_backward_extremities as b"
+ " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " WHERE b.room_id = ? AND g.is_state is ?"
+ " GROUP BY b.event_id"
+ )
+
+ txn.execute(sql, (room_id, False,))
+
+ return dict(txn.fetchall())
+
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,
@@ -247,11 +269,13 @@ class EventFederationStore(SQLBaseStore):
do_insert = depth < min_depth if min_depth else True
if do_insert:
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
table="room_depth",
- values={
+ keyvalues={
"room_id": room_id,
+ },
+ values={
"min_depth": depth,
},
)
@@ -306,31 +330,27 @@ class EventFederationStore(SQLBaseStore):
txn.execute(query, (event_id, room_id))
- # Insert all the prev_events as a backwards thing, they'll get
- # deleted in a second if they're incorrect anyway.
- self._simple_insert_many_txn(
- txn,
- table="event_backward_extremities",
- values=[
- {
- "event_id": e_id,
- "room_id": room_id,
- }
- for e_id, _ in prev_events
- ],
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+ " )"
)
- # Also delete from the backwards extremities table all ones that
- # reference events that we have already seen
+ txn.executemany(query, [
+ (e_id, room_id, e_id, room_id, e_id, room_id, )
+ for e_id, _ in prev_events
+ ])
+
query = (
- "DELETE FROM event_backward_extremities WHERE EXISTS ("
- "SELECT 1 FROM events "
- "WHERE "
- "event_backward_extremities.event_id = events.event_id "
- "AND not events.outlier "
- ")"
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
)
- txn.execute(query)
+ txn.execute(query, (event_id, room_id))
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, room_id
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 626a5eaf6e..a5a6869079 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -135,19 +135,17 @@ class EventsStore(SQLBaseStore):
outlier = event.internal_metadata.is_outlier()
if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
- have_persisted = self._simple_select_one_onecol_txn(
+ have_persisted = self._simple_select_one_txn(
txn,
- table="event_json",
+ table="events",
keyvalues={"event_id": event.event_id},
- retcol="event_id",
+ retcols=["event_id", "outlier"],
allow_none=True,
)
@@ -162,7 +160,9 @@ class EventsStore(SQLBaseStore):
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
- if not outlier:
+ if not outlier and have_persisted["outlier"]:
+ self._store_state_groups_txn(txn, event, context)
+
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
@@ -182,6 +182,9 @@ class EventsStore(SQLBaseStore):
)
return
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
self._handle_prev_events(
txn,
outlier=outlier,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 280d4ad605..8045e17fd7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,11 +37,9 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
+from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
-from collections import namedtuple
-
import logging
@@ -55,76 +53,26 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
-class _StreamToken(namedtuple("_StreamToken", "topological stream")):
- """Tokens are positions between events. The token "s1" comes after event 1.
-
- s0 s1
- | |
- [0] V [1] V [2]
-
- Tokens can either be a point in the live event stream or a cursor going
- through historic events.
-
- When traversing the live event stream events are ordered by when they
- arrived at the homeserver.
-
- When traversing historic events the events are ordered by their depth in
- the event graph "topological_ordering" and then by when they arrived at the
- homeserver "stream_ordering".
-
- Live tokens start with an "s" followed by the "stream_ordering" id of the
- event it comes after. Historic tokens start with a "t" followed by the
- "topological_ordering" id of the event it comes after, follewed by "-",
- followed by the "stream_ordering" id of the event it comes after.
- """
- __slots__ = []
-
- @classmethod
- def parse(cls, string):
- try:
- if string[0] == 's':
- return cls(topological=None, stream=int(string[1:]))
- if string[0] == 't':
- parts = string[1:].split('-', 1)
- return cls(topological=int(parts[0]), stream=int(parts[1]))
- except:
- pass
- raise SynapseError(400, "Invalid token %r" % (string,))
-
- @classmethod
- def parse_stream_token(cls, string):
- try:
- if string[0] == 's':
- return cls(topological=None, stream=int(string[1:]))
- except:
- pass
- raise SynapseError(400, "Invalid token %r" % (string,))
-
- def __str__(self):
- if self.topological is not None:
- return "t%d-%d" % (self.topological, self.stream)
- else:
- return "s%d" % (self.stream,)
+def lower_bound(token):
+ if token.topological is None:
+ return "(%d < %s)" % (token.stream, "stream_ordering")
+ else:
+ return "(%d < %s OR (%d = %s AND %d < %s))" % (
+ token.topological, "topological_ordering",
+ token.topological, "topological_ordering",
+ token.stream, "stream_ordering",
+ )
- def lower_bound(self):
- if self.topological is None:
- return "(%d < %s)" % (self.stream, "stream_ordering")
- else:
- return "(%d < %s OR (%d = %s AND %d < %s))" % (
- self.topological, "topological_ordering",
- self.topological, "topological_ordering",
- self.stream, "stream_ordering",
- )
- def upper_bound(self):
- if self.topological is None:
- return "(%d >= %s)" % (self.stream, "stream_ordering")
- else:
- return "(%d > %s OR (%d = %s AND %d >= %s))" % (
- self.topological, "topological_ordering",
- self.topological, "topological_ordering",
- self.stream, "stream_ordering",
- )
+def upper_bound(token):
+ if token.topological is None:
+ return "(%d >= %s)" % (token.stream, "stream_ordering")
+ else:
+ return "(%d > %s OR (%d = %s AND %d >= %s))" % (
+ token.topological, "topological_ordering",
+ token.topological, "topological_ordering",
+ token.stream, "stream_ordering",
+ )
class StreamStore(SQLBaseStore):
@@ -139,8 +87,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _StreamToken.parse_stream_token(from_key)
- to_id = _StreamToken.parse_stream_token(to_key)
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
@@ -234,8 +182,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _StreamToken.parse_stream_token(from_key)
- to_id = _StreamToken.parse_stream_token(to_key)
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
if from_key == to_key:
return defer.succeed(([], to_key))
@@ -288,17 +236,17 @@ class StreamStore(SQLBaseStore):
args = [False, room_id]
if direction == 'b':
order = "DESC"
- bounds = _StreamToken.parse(from_key).upper_bound()
+ bounds = upper_bound(RoomStreamToken.parse(from_key))
if to_key:
bounds = "%s AND %s" % (
- bounds, _StreamToken.parse(to_key).lower_bound()
+ bounds, lower_bound(RoomStreamToken.parse(to_key))
)
else:
order = "ASC"
- bounds = _StreamToken.parse(from_key).lower_bound()
+ bounds = lower_bound(RoomStreamToken.parse(from_key))
if to_key:
bounds = "%s AND %s" % (
- bounds, _StreamToken.parse(to_key).upper_bound()
+ bounds, upper_bound(RoomStreamToken.parse(to_key))
)
if int(limit) > 0:
@@ -333,7 +281,7 @@ class StreamStore(SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
- next_token = str(_StreamToken(topo, toke))
+ next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
@@ -354,7 +302,7 @@ class StreamStore(SQLBaseStore):
with_feedback=False, from_token=None):
# TODO (erikj): Handle compressed feedback
- end_token = _StreamToken.parse_stream_token(end_token)
+ end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None:
sql = (
@@ -365,7 +313,7 @@ class StreamStore(SQLBaseStore):
" LIMIT ?"
)
else:
- from_token = _StreamToken.parse_stream_token(from_token)
+ from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
@@ -395,7 +343,7 @@ class StreamStore(SQLBaseStore):
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
- start_token = str(_StreamToken(topo, toke))
+ start_token = str(RoomStreamToken(topo, toke))
token = (start_token, str(end_token))
else:
@@ -416,9 +364,25 @@ class StreamStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_room_events_max_id(self):
+ def get_room_events_max_id(self, direction='f'):
token = yield self._stream_id_gen.get_max_token(self)
- defer.returnValue("s%d" % (token,))
+ if direction != 'b':
+ defer.returnValue("s%d" % (token,))
+ else:
+ topo = yield self.runInteraction(
+ "_get_max_topological_txn", self._get_max_topological_txn
+ )
+ defer.returnValue("t%d-%d" % (topo, token))
+
+ def _get_max_topological_txn(self, txn):
+ txn.execute(
+ "SELECT MAX(topological_ordering) FROM events"
+ " WHERE outlier = ?",
+ (False,)
+ )
+
+ rows = txn.fetchall()
+ return rows[0][0] if rows else 0
@defer.inlineCallbacks
def _get_min_token(self):
@@ -439,5 +403,5 @@ class StreamStore(SQLBaseStore):
stream = row["stream_ordering"]
topo = event.depth
internal = event.internal_metadata
- internal.before = str(_StreamToken(topo, stream - 1))
- internal.after = str(_StreamToken(topo, stream))
+ internal.before = str(RoomStreamToken(topo, stream - 1))
+ internal.after = str(RoomStreamToken(topo, stream))
|