diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..4c296d72c0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -33,22 +33,20 @@ what sort order was used:
and stream ordering columns respectively.
"""
+import abc
+import logging
+from collections import namedtuple
+
+from six.moves import range
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
-
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine
-
-import abc
-import logging
-
-from six.moves import range
-from collections import namedtuple
-
logger = logging.getLogger(__name__)
@@ -350,7 +348,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token (str): The stream token representing now.
Returns:
- Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
+ Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
events and a token pointing to the start of the returned
events.
The events returned are in ascending order.
@@ -381,7 +379,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token (str): The stream token representing now.
Returns:
- Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
+ Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
_EventDictReturn and a token pointing to the start of the returned
events.
The events returned are in ascending order.
@@ -529,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ def get_events_around(
+ self, room_id, event_id, before_limit, after_limit, event_filter=None,
+ ):
"""Retrieve events and pagination tokens around a given event in a
room.
@@ -538,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -545,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results = yield self.runInteraction(
"get_events_around", self._get_events_around_txn,
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter,
)
events_before = yield self._get_events(
@@ -565,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"end": results["after"]["token"],
})
- def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ def _get_events_around_txn(
+ self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+ ):
"""Retrieves event_ids and pagination tokens around a given event in a
room.
@@ -574,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -603,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
+ event_filter=event_filter,
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
+ event_filter=event_filter,
)
events_after = [r.event_id for r in rows]
|