diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-11-02 10:57:00 +0000 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-11-02 10:57:00 +0000 |
commit | 0e367563838de76972bac1fd7bc3ab3943b9a57e (patch) | |
tree | f3de1ae204fa232033d8951c31a45a71a797d94c /synapse/handlers | |
parent | Support clients supplying older tokens, fix short poll test (diff) | |
parent | Merge pull request #337 from matrix-org/markjh/v2_sync_joining (diff) | |
download | synapse-0e367563838de76972bac1fd7bc3ab3943b9a57e.tar.xz |
Merge branch 'develop' into markjh/room_tags
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 3 | ||||
-rw-r--r-- | synapse/handlers/room.py | 55 | ||||
-rw-r--r-- | synapse/handlers/search.py | 56 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 61 |
4 files changed, 152 insertions, 23 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 87b4d381c7..6a2339f2eb 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( - RoomCreationHandler, RoomMemberHandler, RoomListHandler + RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler, ) from .message import MessageHandler from .events import EventStreamHandler, EventHandler @@ -70,3 +70,4 @@ class Handlers(object): self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) + self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 60f9fa58b0..36878a6c20 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -33,6 +33,7 @@ from collections import OrderedDict from unpaddedbase64 import decode_base64 import logging +import math import string logger = logging.getLogger(__name__) @@ -747,6 +748,60 @@ class RoomListHandler(BaseHandler): defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) +class RoomContextHandler(BaseHandler): + @defer.inlineCallbacks + def get_event_context(self, user, room_id, event_id, limit): + """Retrieves events, pagination tokens and state around a given event + in a room. + + Args: + user (UserID) + room_id (str) + event_id (str) + limit (int): The maximum number of events to return in total + (excluding state). + + Returns: + dict + """ + before_limit = math.floor(limit/2.) + after_limit = limit - before_limit + + now_token = yield self.hs.get_event_sources().get_current_token() + + results = yield self.store.get_events_around( + room_id, event_id, before_limit, after_limit + ) + + results["events_before"] = yield self._filter_events_for_client( + user.to_string(), results["events_before"] + ) + + results["events_after"] = yield self._filter_events_for_client( + user.to_string(), results["events_after"] + ) + + if results["events_after"]: + last_event_id = results["events_after"][-1].event_id + else: + last_event_id = event_id + + state = yield self.store.get_state_for_events( + [last_event_id], None + ) + results["state"] = state[last_event_id].values() + + results["start"] = now_token.copy_and_replace( + "room_key", results["start"] + ).to_string() + + results["end"] = now_token.copy_and_replace( + "room_key", results["end"] + ).to_string() + + defer.returnValue(results) + + class RoomEventSource(object): def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index bbe82b1425..2718e9482e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -51,6 +51,17 @@ class SearchHandler(BaseHandler): "content.body", "content.name", "content.topic", ]) filter_dict = content["search_categories"]["room_events"].get("filter", {}) + event_context = content["search_categories"]["room_events"].get( + "event_context", None + ) + + if event_context is not None: + before_limit = int(event_context.get( + "before_limit", 5 + )) + after_limit = int(event_context.get( + "after_limit", 5 + )) except KeyError: raise SynapseError(400, "Invalid search query") @@ -76,14 +87,57 @@ class SearchHandler(BaseHandler): user.to_string(), filtered_events ) + allowed_events.sort(key=lambda e: -rank_map[e.event_id]) + allowed_events = allowed_events[:search_filter.limit()] + + if event_context is not None: + now_token = yield self.hs.get_event_sources().get_current_token() + + contexts = {} + for event in allowed_events: + res = yield self.store.get_events_around( + event.room_id, event.event_id, before_limit, after_limit + ) + + res["events_before"] = yield self._filter_events_for_client( + user.to_string(), res["events_before"] + ) + + res["events_after"] = yield self._filter_events_for_client( + user.to_string(), res["events_after"] + ) + + res["start"] = now_token.copy_and_replace( + "room_key", res["start"] + ).to_string() + + res["end"] = now_token.copy_and_replace( + "room_key", res["end"] + ).to_string() + + contexts[event.event_id] = res + else: + contexts = {} + # TODO: Add a limit time_now = self.clock.time_msec() + for context in contexts.values(): + context["events_before"] = [ + serialize_event(e, time_now) + for e in context["events_before"] + ] + context["events_after"] = [ + serialize_event(e, time_now) + for e in context["events_after"] + ] + results = { e.event_id: { "rank": rank_map[e.event_id], - "result": serialize_event(e, time_now) + "result": serialize_event(e, time_now), + "context": contexts.get(e.event_id, {}), } for e in allowed_events } diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b8e2c81969..4c5a2353b2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -113,15 +113,20 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() @defer.inlineCallbacks - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. Returns: A Deferred SyncResult. """ - if timeout == 0 or since_token is None: - result = yield self.current_sync_for_user(sync_config, since_token) + + if timeout == 0 or since_token is None or full_state: + # we are going to return immediately, so don't bother calling + # notifier.wait_for_events. + result = yield self.current_sync_for_user(sync_config, since_token, + full_state=full_state) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): @@ -146,19 +151,24 @@ class SyncHandler(BaseHandler): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None): + def current_sync_for_user(self, sync_config, since_token=None, + full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None: - return self.initial_sync(sync_config) + if since_token is None or full_state: + return self.full_state_sync(sync_config, since_token) else: return self.incremental_sync_with_gap(sync_config, since_token) @defer.inlineCallbacks - def initial_sync(self, sync_config): - """Get a sync for a client which is starting without any state + def full_state_sync(self, sync_config, timeline_since_token): + """Get a sync for a client which is starting without any state. + + If a 'message_since_token' is given, only timeline events which have + happened since that token will be returned. + Returns: A Deferred SyncResult. """ @@ -192,8 +202,12 @@ class SyncHandler(BaseHandler): archived = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.initial_sync_for_joined_room( - event.room_id, sync_config, now_token, typing_by_room + room_sync = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + typing_by_room=typing_by_room ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -206,11 +220,12 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync = yield self.initial_sync_for_archived_room( + room_sync = yield self.full_state_sync_for_archived_room( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, leave_token=leave_token, + timeline_since_token=timeline_since_token, ) archived.append(room_sync) @@ -223,15 +238,16 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_joined_room(self, room_id, sync_config, now_token, - typing_by_room): + def full_state_sync_for_joined_room(self, room_id, sync_config, + now_token, timeline_since_token, + typing_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, + room_id, sync_config, now_token, since_token=timeline_since_token ) current_state = yield self.state_handler.get_current_state( @@ -278,15 +294,16 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, typing_by_room)) @defer.inlineCallbacks - def initial_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token): + def full_state_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token, + timeline_since_token): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, + room_id, sync_config, leave_token, since_token=timeline_since_token ) leave_state = yield self.store.get_state_for_events( @@ -370,7 +387,7 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state = yield self.check_joined_room( + state, limited = yield self.check_joined_room( sync_config, room_id, state ) @@ -379,7 +396,7 @@ class SyncHandler(BaseHandler): timeline=TimelineBatch( events=recents, prev_batch=prev_batch, - limited=False, + limited=limited, ), state=state, ephemeral=typing_by_room.get(room_id, []) @@ -503,7 +520,7 @@ class SyncHandler(BaseHandler): current_state=current_state_events, ) - state_events_delta = yield self.check_joined_room( + state_events_delta, _ = yield self.check_joined_room( sync_config, room_id, state_events_delta ) @@ -610,6 +627,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def check_joined_room(self, sync_config, room_id, state_delta): joined = False + limited = False for event in state_delta: if ( event.type == EventTypes.Member @@ -621,5 +639,6 @@ class SyncHandler(BaseHandler): if joined: res = yield self.state_handler.get_current_state(room_id) state_delta = res.values() + limited = True - defer.returnValue(state_delta) + defer.returnValue((state_delta, limited)) |