diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ad2753c1b5..dc76d34a52 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -66,7 +66,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True):
+ as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
@@ -75,11 +75,11 @@ class MessageHandler(BaseHandler):
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
- data_source = self.hs.get_event_sources().sources["room"]
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
@@ -129,8 +129,13 @@ class MessageHandler(BaseHandler):
room_id, max_topo
)
- events, next_key = yield data_source.get_pagination_rows(
- requester.user, source_config, room_id
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
@@ -144,6 +149,9 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
+ if event_filter:
+ events = event_filter.filter(events)
+
events = yield filter_events_for_client(
self.store,
user_id,
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 86fbe2747d..866a1e9120 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -20,12 +20,14 @@ from .base import ClientV1RestServlet, client_path_patterns
from synapse.api.errors import SynapseError, Codes, AuthError
from synapse.streams.config import PaginationConfig
from synapse.api.constants import EventTypes, Membership
+from synapse.api.filtering import Filter
from synapse.types import UserID, RoomID, RoomAlias
from synapse.events.utils import serialize_event
from synapse.http.servlet import parse_json_object_from_request
import logging
import urllib
+import ujson as json
logger = logging.getLogger(__name__)
@@ -327,12 +329,19 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
request, default_limit=10,
)
as_client_event = "raw" not in request.args
+ filter_bytes = request.args.get("filter", None)
+ if filter_bytes:
+ filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
+ event_filter = Filter(json.loads(filter_json))
+ else:
+ event_filter = None
handler = self.handlers.message_handler
msgs = yield handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
- as_client_event=as_client_event
+ as_client_event=as_client_event,
+ event_filter=event_filter,
)
defer.returnValue((200, msgs))
diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py
index f22ba8db89..38556da9a7 100644
--- a/tests/storage/event_injector.py
+++ b/tests/storage/event_injector.py
@@ -30,6 +30,7 @@ class EventInjector:
def create_room(self, room):
builder = self.event_builder_factory.new({
"type": EventTypes.Create,
+ "sender": "",
"room_id": room.to_string(),
"content": {},
})
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
index 18a6cff0c7..3762b38e37 100644
--- a/tests/storage/test_events.py
+++ b/tests/storage/test_events.py
@@ -37,7 +37,7 @@ class EventsStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_count_daily_messages(self):
- self.db_pool.runQuery("DELETE FROM stats_reporting")
+ yield self.db_pool.runQuery("DELETE FROM stats_reporting")
self.hs.clock.now = 100
@@ -60,7 +60,7 @@ class EventsStoreTestCase(unittest.TestCase):
# it isn't old enough.
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
- self._assert_stats_reporting(1, self.hs.clock.now)
+ yield self._assert_stats_reporting(1, self.hs.clock.now)
# Already reported yesterday, two new events from today.
yield self.event_injector.inject_message(room, user, "Yeah they are!")
@@ -68,21 +68,21 @@ class EventsStoreTestCase(unittest.TestCase):
self.hs.clock.now += 60 * 60 * 24
count = yield self.store.count_daily_messages()
self.assertEqual(2, count) # 2 since yesterday
- self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever
+ yield self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever
# Last reported too recently.
yield self.event_injector.inject_message(room, user, "Who could disagree?")
self.hs.clock.now += 60 * 60 * 22
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
- self._assert_stats_reporting(4, self.hs.clock.now)
+ yield self._assert_stats_reporting(4, self.hs.clock.now)
# Last reported too long ago
yield self.event_injector.inject_message(room, user, "No one.")
self.hs.clock.now += 60 * 60 * 26
count = yield self.store.count_daily_messages()
self.assertIsNone(count)
- self._assert_stats_reporting(5, self.hs.clock.now)
+ yield self._assert_stats_reporting(5, self.hs.clock.now)
# And now let's actually report something
yield self.event_injector.inject_message(room, user, "Indeed.")
@@ -92,7 +92,7 @@ class EventsStoreTestCase(unittest.TestCase):
self.hs.clock.now += (60 * 60 * 24) + 50
count = yield self.store.count_daily_messages()
self.assertEqual(3, count)
- self._assert_stats_reporting(8, self.hs.clock.now)
+ yield self._assert_stats_reporting(8, self.hs.clock.now)
@defer.inlineCallbacks
def _get_last_stream_token(self):
|