summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-09-03 21:08:35 +1000
committerGitHub <noreply@github.com>2018-09-03 21:08:35 +1000
commit4fc4b881c58fd638db5f4dac0863721111b67af0 (patch)
treecc1604f5e3b4e0a263e0e11a55b62ef4006a64a1 /synapse/storage/stream.py
parentThe project `matrix-synapse-auto-deploy` does not seem to be maintained anymore. (diff)
parentMerge pull request #3777 from matrix-org/neilj/fix_register_user_registration (diff)
downloadsynapse-4fc4b881c58fd638db5f4dac0863721111b67af0.tar.xz
Merge branch 'develop' into develop
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py36
1 files changed, 21 insertions, 15 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..4c296d72c0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -33,22 +33,20 @@ what sort order was used:
       and stream ordering columns respectively.
 """
 
+import abc
+import logging
+from collections import namedtuple
+
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
-
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine
-
-import abc
-import logging
-
-from six.moves import range
-from collections import namedtuple
-
 
 logger = logging.getLogger(__name__)
 
@@ -350,7 +348,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             end_token (str): The stream token representing now.
 
         Returns:
-            Deferred[tuple[list[FrozenEvent],  str]]: Returns a list of
+            Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
             events and a token pointing to the start of the returned
             events.
             The events returned are in ascending order.
@@ -381,7 +379,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             end_token (str): The stream token representing now.
 
         Returns:
-            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
             _EventDictReturn and a token pointing to the start of the returned
             events.
             The events returned are in ascending order.
@@ -529,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
 
     @defer.inlineCallbacks
-    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+    def get_events_around(
+        self, room_id, event_id, before_limit, after_limit, event_filter=None,
+    ):
         """Retrieve events and pagination tokens around a given event in a
         room.
 
@@ -538,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -545,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = yield self.runInteraction(
             "get_events_around", self._get_events_around_txn,
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter,
         )
 
         events_before = yield self._get_events(
@@ -565,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "end": results["after"]["token"],
         })
 
-    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+    def _get_events_around_txn(
+        self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+    ):
         """Retrieves event_ids and pagination tokens around a given event in a
         room.
 
@@ -574,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -603,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows, start_token = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
+            event_filter=event_filter,
         )
         events_before = [r.event_id for r in rows]
 
         rows, end_token = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
+            event_filter=event_filter,
         )
         events_after = [r.event_id for r in rows]