diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 2590b52f73..cc27ec3804 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -58,6 +58,7 @@ from twisted.internet import defer
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -71,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -355,6 +357,24 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
)
args.extend(event_filter.related_by_rel_types)
+ if event_filter.rel_types:
+ clauses.append(
+ "(%s)"
+ % " OR ".join(
+ "event_relation.relation_type = ?" for _ in event_filter.rel_types
+ )
+ )
+ args.extend(event_filter.rel_types)
+
+ if event_filter.not_rel_types:
+ clauses.append(
+ "((%s) OR event_relation.relation_type IS NULL)"
+ % " AND ".join(
+ "event_relation.relation_type != ?" for _ in event_filter.not_rel_types
+ )
+ )
+ args.extend(event_filter.not_rel_types)
+
return " AND ".join(clauses), args
@@ -395,6 +415,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+ self._min_stream_order_on_start = self.get_room_min_stream_ordering()
def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
@@ -596,6 +617,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key
+ @cancellable
async def get_membership_changes_for_user(
self,
user_id: str,
@@ -1021,28 +1043,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"after": {"event_ids": events_after, "token": end_token},
}
- async def get_all_new_events_stream(
- self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
- ) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
+ async def get_all_new_event_ids_stream(
+ self,
+ from_id: int,
+ current_id: int,
+ limit: int,
+ ) -> Tuple[int, Dict[str, Optional[int]]]:
"""Get all new events
- Returns all events with from_id < stream_ordering <= current_id.
+ Returns all event ids with from_id < stream_ordering <= current_id.
Args:
from_id: the stream_ordering of the last event we processed
current_id: the stream_ordering of the most recently processed event
limit: the maximum number of events to return
- get_prev_content: whether to fetch previous event content
Returns:
- A tuple of (next_id, events, event_to_received_ts), where `next_id`
+ A tuple of (next_id, event_to_received_ts), where `next_id`
is the next value to pass as `from_id` (it will either be the
stream_ordering of the last returned event, or, if fewer than `limit`
events were found, the `current_id`). The `event_to_received_ts` is
- a dictionary mapping event ID to the event `received_ts`.
+ a dictionary mapping event ID to the event `received_ts`, sorted by ascending
+ stream_ordering.
"""
- def get_all_new_events_stream_txn(
+ def get_all_new_event_ids_stream_txn(
txn: LoggingTransaction,
) -> Tuple[int, Dict[str, Optional[int]]]:
sql = (
@@ -1067,15 +1092,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return upper_bound, event_to_received_ts
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
- "get_all_new_events_stream", get_all_new_events_stream_txn
- )
-
- events = await self.get_events_as_list(
- event_to_received_ts.keys(),
- get_prev_content=get_prev_content,
+ "get_all_new_event_ids_stream", get_all_new_event_ids_stream_txn
)
- return upper_bound, events, event_to_received_ts
+ return upper_bound, event_to_received_ts
async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions:
@@ -1199,8 +1219,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`to_token`), or `limit` is zero.
"""
- assert int(limit) >= 0
-
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -1279,8 +1297,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Multiple labels could cause the same event to appear multiple times.
needs_distinct = True
- # If there is a filter on relation_senders and relation_types join to the
- # relations table.
+ # If there is a relation_senders and relation_types filter join to the
+ # relations table to get events related to the current event.
if event_filter and (
event_filter.related_by_senders or event_filter.related_by_rel_types
):
@@ -1295,6 +1313,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
"""
+ # If there is a not_rel_types filter join to the relations table to get
+ # the event's relation information.
+ if event_filter and (event_filter.rel_types or event_filter.not_rel_types):
+ join_clause += """
+ LEFT JOIN event_relations AS event_relation USING (event_id)
+ """
+
if needs_distinct:
select_keywords += " DISTINCT"
@@ -1331,21 +1356,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if rows:
topo = rows[-1].topological_ordering
- toke = rows[-1].stream_ordering
+ token = rows[-1].stream_ordering
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
- toke -= 1
- next_token = RoomStreamToken(topo, toke)
+ token -= 1
+ next_token = RoomStreamToken(topo, token)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, next_token
+ @trace
async def paginate_room_events(
self,
room_id: str,
|