From c85c9125627a62c73711786723be12be30d7a81e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Oct 2015 15:48:31 +0100 Subject: Add basic full text search impl. --- synapse/rest/client/v1/room.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 23871f161e..35bd702a43 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -529,6 +529,22 @@ class RoomTypingRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) +class SearchRestServlet(ClientV1RestServlet): + PATTERN = client_path_pattern( + "/search$" + ) + + @defer.inlineCallbacks + def on_POST(self, request): + auth_user, _ = yield self.auth.get_user_by_req(request) + + content = _parse_json(request) + + results = yield self.handlers.search_handler.search(content) + + defer.returnValue((200, results)) + + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -585,3 +601,4 @@ def register_servlets(hs, http_server): RoomInitialSyncRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) + SearchRestServlet(hs).register(http_server) -- cgit 1.4.1 From 61561b9df791ec90e287e535cc75831c2016bf36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2015 10:49:53 +0100 Subject: Keep FTS indexes up to date. Only search through rooms currently joined --- synapse/handlers/search.py | 31 ++++++++++++++++++++++--------- synapse/rest/client/v1/room.py | 2 +- synapse/storage/events.py | 2 ++ synapse/storage/room.py | 22 ++++++++++++++++++++++ synapse/storage/schema/delta/24/fts.py | 3 ++- synapse/storage/search.py | 7 ++++++- 6 files changed, 55 insertions(+), 12 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 8b997fc394..b6bdb752e9 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -65,7 +65,7 @@ class SearchHandler(BaseHandler): super(SearchHandler, self).__init__(hs) @defer.inlineCallbacks - def search(self, content): + def search(self, user, content): constraint_dicts = content["search_categories"]["room_events"]["constraints"] constraints = [RoomConstraint.from_dict(c)for c in constraint_dicts] @@ -76,20 +76,33 @@ class SearchHandler(BaseHandler): raise SynapseError(400, "Only one constraint can be FTS") fts = True - res = yield self.hs.get_datastore().search_msgs(constraints) + rooms = yield self.store.get_rooms_for_user( + user.to_string(), + ) - time_now = self.hs.get_clock().time_msec() + # For some reason the list of events contains duplicates + # TODO(paul): work out why because I really don't think it should + room_ids = set(r.room_id for r in rooms) - results = [ - { + res = yield self.store.search_msgs(room_ids, constraints) + + time_now = self.clock.time_msec() + + results = { + r["result"].event_id: { "rank": r["rank"], "result": serialize_event(r["result"], time_now) } for r in res - ] + } logger.info("returning: %r", results) - results.sort(key=lambda r: -r["rank"]) - - defer.returnValue(results) + defer.returnValue({ + "search_categories": { + "room_events": { + "results": results, + "count": len(results) + } + } + }) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 35bd702a43..94adabca62 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -540,7 +540,7 @@ class SearchRestServlet(ClientV1RestServlet): content = _parse_json(request) - results = yield self.handlers.search_handler.search(content) + results = yield self.handlers.search_handler.search(auth_user, content) defer.returnValue((200, results)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 416ef6af93..e6c1abfc27 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -307,6 +307,8 @@ class EventsStore(SQLBaseStore): self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Message: + self._store_room_message_txn(txn, event) elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 5e07b7e0e5..e4e830944a 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -175,6 +175,10 @@ class RoomStore(SQLBaseStore): }, ) + self._store_event_search_txn( + txn, event, "content.topic", event.content["topic"] + ) + def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( @@ -187,6 +191,24 @@ class RoomStore(SQLBaseStore): } ) + self._store_event_search_txn( + txn, event, "content.name", event.content["name"] + ) + + def _store_room_message_txn(self, txn, event): + if hasattr(event, "content") and "body" in event.content: + self._store_event_search_txn( + txn, event, "content.body", event.content["body"] + ) + + def _store_event_search_txn(self, txn, event, key, value): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, vector)" + " VALUES (?,?,?,to_tsvector('english', ?))" + ) + + txn.execute(sql, (event.event_id, event.room_id, key, value,)) + @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): def f(txn): diff --git a/synapse/storage/schema/delta/24/fts.py b/synapse/storage/schema/delta/24/fts.py index 5680332758..05f1605fdd 100644 --- a/synapse/storage/schema/delta/24/fts.py +++ b/synapse/storage/schema/delta/24/fts.py @@ -44,7 +44,8 @@ INSERT INTO event_search SELECT FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic'; -CREATE INDEX event_search_idx ON event_search USING gin(vector); +CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); +CREATE INDEX event_search_ev_idx ON event_search(event_id); """ diff --git a/synapse/storage/search.py b/synapse/storage/search.py index eea4477765..e66b5f9edc 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -21,11 +21,16 @@ from synapse.api.constants import KnownRoomEventKeys, SearchConstraintTypes class SearchStore(SQLBaseStore): @defer.inlineCallbacks - def search_msgs(self, constraints): + def search_msgs(self, room_ids, constraints): clauses = [] args = [] fts = None + clauses.append( + "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) + ) + args.extend(room_ids) + for c in constraints: local_clauses = [] if c.search_type == SearchConstraintTypes.FTS: -- cgit 1.4.1 From 68b7fc3e2ba0aae7813b0bae52370860b5cd9f26 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Oct 2015 17:26:18 +0100 Subject: Add rooms that the user has left under archived in v2 sync. --- synapse/handlers/sync.py | 128 ++++++++++++++++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 29 ++++++-- synapse/storage/roommember.py | 13 ++++ 3 files changed, 161 insertions(+), 9 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee6b881de1..1891cd088c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -61,18 +61,37 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ return bool(self.timeline or self.state or self.ephemeral) +class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ + "room_id", + "timeline", + "state", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.timeline or self.state) + + class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ "room_id", "invite", ])): __slots__ = [] + def __nonzero__(self): + """Invited rooms should always be reported to the client""" + return True + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. + "archived", # ArchivedSyncResult for each archived room. ])): __slots__ = [] @@ -156,11 +175,14 @@ class SyncHandler(BaseHandler): ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE, Membership.JOIN] + membership_list=[ + Membership.INVITE, Membership.JOIN, Membership.LEAVE + ] ) joined = [] invited = [] + archived = [] for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_joined_room( @@ -173,11 +195,23 @@ class SyncHandler(BaseHandler): room_id=event.room_id, invite=invite, )) + elif event.membership == Membership.LEAVE: + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_sync = yield self.initial_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + ) + archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -204,6 +238,28 @@ class SyncHandler(BaseHandler): ephemeral=[], )) + @defer.inlineCallbacks + def initial_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token): + """Sync a room for a client which is starting without any state + Returns: + A Deferred JoinedSyncResult. + """ + + batch = yield self.load_filtered_recents( + room_id, sync_config, leave_token, + ) + + leave_state = yield self.store.get_state_for_events( + [leave_event_id], None + ) + + defer.returnValue(ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=leave_state[leave_event_id].values(), + )) + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -257,18 +313,22 @@ class SyncHandler(BaseHandler): ) joined = [] + archived = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. invite_events = [] + leave_events = [] events_by_room_id = {} for event in room_events: events_by_room_id.setdefault(event.room_id, []).append(event) if event.room_id not in joined_room_ids: if (event.type == EventTypes.Member - and event.membership == Membership.INVITE and event.state_key == sync_config.user.to_string()): - invite_events.append(event) + if event.membership == Membership.INVITE: + invite_events.append(event) + elif event.membership == Membership.LEAVE: + leave_events.append(event) for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) @@ -296,11 +356,16 @@ class SyncHandler(BaseHandler): ) if room_sync: joined.append(room_sync) + else: invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) + leave_events = yield self.store.get_leave_events_for_user( + sync_config.user.to_string() + ) + for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, @@ -309,6 +374,12 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) + for leave_event in leave_events: + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, leave_event, since_token + ) + archived.append(room_sync) + invited = [ InvitedSyncResult(room_id=event.room_id, invite=event) for event in invite_events @@ -318,6 +389,7 @@ class SyncHandler(BaseHandler): presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -416,6 +488,56 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) + @defer.inlineCallbacks + def incremental_sync_for_archived_room(self, sync_config, leave_event, + since_token): + """ Get the incremental delta needed to bring the client up to date for + the archived room. + Returns: + A Deferred ArchivedSyncResult + """ + + stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + + leave_token = since_token.copy_and_replace("room_key", stream_token) + + batch = yield self.load_filtered_recents( + leave_event.room_id, sync_config, leave_token, since_token, + ) + + logging.debug("Recents %r", batch) + + # TODO(mjark): This seems racy since this isn't being passed a + # token to indicate what point in the stream this is + leave_state = yield self.store.get_state_for_events( + [leave_event.event_id], None + ) + + state_events_at_leave = leave_state[leave_event.event_id].values() + + state_at_previous_sync = yield self.get_state_at_previous_sync( + leave_event.room_id, since_token=since_token + ) + + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + + room_sync = ArchivedSyncResult( + room_id=leave_event.room_id, + timeline=batch, + state=state_events_delta, + ) + + logging.debug("Room sync: %r", room_sync) + + defer.returnValue(room_sync) + + @defer.inlineCallbacks def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index fffecb24f5..73473a7e6b 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -136,6 +136,10 @@ class SyncRestServlet(RestServlet): sync_result.invited, filter, time_now, token_id ) + archived = self.encode_archived( + sync_result.archived, filter, time_now, token_id + ) + response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now @@ -143,7 +147,7 @@ class SyncRestServlet(RestServlet): "rooms": { "joined": joined, "invited": invited, - "archived": {}, + "archived": archived, }, "next_batch": sync_result.next_batch.to_string(), } @@ -182,14 +186,20 @@ class SyncRestServlet(RestServlet): return invited + def encode_archived(self, rooms, filter, time_now, token_id): + joined = {} + for room in rooms: + joined[room.room_id] = self.encode_room( + room, filter, time_now, token_id, joined=False + ) + + return joined + @staticmethod - def encode_room(room, filter, time_now, token_id): + def encode_room(room, filter, time_now, token_id, joined=True): event_map = {} state_events = filter.filter_room_state(room.state) - timeline_events = filter.filter_room_timeline(room.timeline.events) - ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -198,6 +208,8 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) + timeline_events = filter.filter_room_timeline(room.timeline.events) + timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -205,6 +217,7 @@ class SyncRestServlet(RestServlet): event_format=format_event_for_client_v2_without_event_id, ) timeline_event_ids.append(event.event_id) + result = { "event_map": event_map, "timeline": { @@ -213,8 +226,12 @@ class SyncRestServlet(RestServlet): "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, - "ephemeral": {"events": ephemeral_events}, } + + if joined: + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) + result["ephemeral"] = {"events": ephemeral_events} + return result diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index dd98dcfda8..623400fd36 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore): invites.event_id for invite in invites ])) + def get_leave_events_for_user(self, user_id): + """ Get all the leave events for a user + Args: + user_id (str): The user ID. + Returns: + A deferred list of event objects. + """ + return self.get_rooms_for_user_where_membership_is( + user_id, [Membership.LEAVE] + ).addCallback(lambda leaves: self._get_events([ + leave.event_id for leave in leaves + ])) + def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. -- cgit 1.4.1 From b02a342750f84ffebb793aa5d3c80780684dd147 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 20 Oct 2015 11:07:50 +0100 Subject: Don't 500 when the email doesn't map to a valid user ID. --- synapse/rest/client/v1/login.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index dacc416055..b2e4cb8eaa 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -101,6 +101,10 @@ class LoginRestServlet(ClientV1RestServlet): user_id = yield self.hs.get_datastore().get_user_id_by_threepid( login_submission['medium'], login_submission['address'] ) + if not user_id: + raise LoginError( + 401, "Unrecognised address", errcode=Codes.UNAUTHORIZED + ) else: user_id = login_submission['user'] -- cgit 1.4.1 From 45cd2b023399dc79a77cf59a356ed1c130d970d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Oct 2015 15:33:25 +0100 Subject: Refactor api.filtering to have a Filter API --- synapse/api/filtering.py | 153 +++++++++++++---------------------- synapse/rest/client/v2_alpha/sync.py | 4 +- tests/api/test_filtering.py | 57 +++++++------ 3 files changed, 88 insertions(+), 126 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index e79e91e7eb..cd7a465e97 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -24,7 +24,7 @@ class Filtering(object): def get_user_filter(self, user_localpart, filter_id): result = self.store.get_user_filter(user_localpart, filter_id) - result.addCallback(Filter) + result.addCallback(FilterCollection) return result def add_user_filter(self, user_localpart, user_filter): @@ -131,125 +131,82 @@ class Filtering(object): raise SynapseError(400, "Bad bundle_updates: expected bool.") -class Filter(object): +class FilterCollection(object): def __init__(self, filter_json): self.filter_json = filter_json + self.room_timeline_filter = Filter( + self.filter_json.get("room", {}).get("timeline", {}) + ) + + self.room_state_filter = Filter( + self.filter_json.get("room", {}).get("state", {}) + ) + + self.room_ephemeral_filter = Filter( + self.filter_json.get("room", {}).get("ephemeral", {}) + ) + + self.presence_filter = Filter( + self.filter_json.get("presence", {}) + ) + def timeline_limit(self): - return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10) + return self.room_timeline_filter.limit() def presence_limit(self): - return self.filter_json.get("presence", {}).get("limit", 10) + return self.presence_filter.limit() def ephemeral_limit(self): - return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) + return self.room_ephemeral_filter.limit() def filter_presence(self, events): - return self._filter_on_key(events, ["presence"]) + return self.presence_filter.filter(events) def filter_room_state(self, events): - return self._filter_on_key(events, ["room", "state"]) + return self.room_state_filter.filter(events) def filter_room_timeline(self, events): - return self._filter_on_key(events, ["room", "timeline"]) + return self.room_timeline_filter.filter(events) def filter_room_ephemeral(self, events): - return self._filter_on_key(events, ["room", "ephemeral"]) - - def _filter_on_key(self, events, keys): - filter_json = self.filter_json - if not filter_json: - return events - - try: - # extract the right definition from the filter - definition = filter_json - for key in keys: - definition = definition[key] - return self._filter_with_definition(events, definition) - except KeyError: - # return all events if definition isn't specified. - return events - - def _filter_with_definition(self, events, definition): - return [e for e in events if self._passes_definition(definition, e)] - - def _passes_definition(self, definition, event): - """Check if the event passes the filter definition - Args: - definition(dict): The filter definition to check against - event(dict or Event): The event to check - Returns: - True if the event passes the filter in the definition - """ - if type(event) is dict: - room_id = event.get("room_id") - sender = event.get("sender") - event_type = event["type"] - else: - room_id = getattr(event, "room_id", None) - sender = getattr(event, "sender", None) - event_type = event.type - return self._event_passes_definition( - definition, room_id, sender, event_type - ) + return self.room_ephemeral_filter.filter(events) - def _event_passes_definition(self, definition, room_id, sender, - event_type): - """Check if the event passes through the given definition. - Args: - definition(dict): The definition to check against. - room_id(str): The id of the room this event is in or None. - sender(str): The sender of the event - event_type(str): The type of the event. - Returns: - True if the event passes through the filter. - """ - # Algorithm notes: - # For each key in the definition, check the event meets the criteria: - # * For types: Literal match or prefix match (if ends with wildcard) - # * For senders/rooms: Literal match only - # * "not_" checks take presedence (e.g. if "m.*" is in both 'types' - # and 'not_types' then it is treated as only being in 'not_types') - - # room checks - if room_id is not None: - allow_rooms = definition.get("rooms", None) - reject_rooms = definition.get("not_rooms", None) - if reject_rooms and room_id in reject_rooms: - return False - if allow_rooms and room_id not in allow_rooms: - return False +class Filter(object): + def __init__(self, filter_json): + self.filter_json = filter_json - # sender checks - if sender is not None: - allow_senders = definition.get("senders", None) - reject_senders = definition.get("not_senders", None) - if reject_senders and sender in reject_senders: - return False - if allow_senders and sender not in allow_senders: + def check(self, event): + literal_keys = { + "rooms": lambda v: event.room_id == v, + "senders": lambda v: event.sender == v, + "types": lambda v: _matches_wildcard(event.type, v) + } + + for name, match_func in literal_keys.items(): + not_name = "not_%s" % (name,) + disallowed_values = self.filter_json.get(not_name, []) + if any(map(match_func, disallowed_values)): return False - # type checks - if "not_types" in definition: - for def_type in definition["not_types"]: - if self._event_matches_type(event_type, def_type): + allowed_values = self.filter_json.get(name, None) + if allowed_values is not None: + if not any(map(match_func, allowed_values)): return False - if "types" in definition: - included = False - for def_type in definition["types"]: - if self._event_matches_type(event_type, def_type): - included = True - break - if not included: - return False return True - def _event_matches_type(self, event_type, def_type): - if def_type.endswith("*"): - type_prefix = def_type[:-1] - return event_type.startswith(type_prefix) - else: - return event_type == def_type + def filter(self, events): + return filter(self.check, events) + + def limit(self): + return self.filter_json.get("limit", 10) + + +def _matches_wildcard(actual_value, filter_value): + if filter_value.endswith("*"): + type_prefix = filter_value[:-1] + return actual_value.startswith(type_prefix) + else: + return actual_value == filter_value diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index fffecb24f5..5e27a859f9 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -23,7 +23,7 @@ from synapse.types import StreamToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_event_id, ) -from synapse.api.filtering import Filter +from synapse.api.filtering import FilterCollection from ._base import client_v2_pattern import copy @@ -103,7 +103,7 @@ class SyncRestServlet(RestServlet): user.localpart, filter_id ) except: - filter = Filter({}) + filter = FilterCollection({}) sync_config = SyncConfig( user=user, diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 6942cdac51..9f9af2d783 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -23,10 +23,17 @@ from tests.utils import ( ) from synapse.types import UserID -from synapse.api.filtering import Filter +from synapse.api.filtering import FilterCollection, Filter user_localpart = "test_user" -MockEvent = namedtuple("MockEvent", "sender type room_id") +# MockEvent = namedtuple("MockEvent", "sender type room_id") + + +def MockEvent(**kwargs): + ev = NonCallableMock(spec_set=kwargs.keys()) + ev.configure_mock(**kwargs) + return ev + class FilteringTestCase(unittest.TestCase): @@ -44,7 +51,6 @@ class FilteringTestCase(unittest.TestCase): ) self.filtering = hs.get_filtering() - self.filter = Filter({}) self.datastore = hs.get_datastore() @@ -57,8 +63,9 @@ class FilteringTestCase(unittest.TestCase): type="m.room.message", room_id="!foo:bar" ) + self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_types_works_with_wildcards(self): @@ -71,7 +78,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_types_works_with_unknowns(self): @@ -84,7 +91,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_types_works_with_literals(self): @@ -97,7 +104,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_types_works_with_wildcards(self): @@ -110,7 +117,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_types_works_with_unknowns(self): @@ -123,7 +130,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_types_takes_priority_over_types(self): @@ -137,7 +144,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_senders_works_with_literals(self): @@ -150,7 +157,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_senders_works_with_unknowns(self): @@ -163,7 +170,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_senders_works_with_literals(self): @@ -176,7 +183,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_senders_works_with_unknowns(self): @@ -189,7 +196,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_senders_takes_priority_over_senders(self): @@ -203,7 +210,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!foo:bar" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_rooms_works_with_literals(self): @@ -216,7 +223,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!secretbase:unknown" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_rooms_works_with_unknowns(self): @@ -229,7 +236,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!anothersecretbase:unknown" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_rooms_works_with_literals(self): @@ -242,7 +249,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!anothersecretbase:unknown" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_rooms_works_with_unknowns(self): @@ -255,7 +262,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!anothersecretbase:unknown" ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_not_rooms_takes_priority_over_rooms(self): @@ -269,7 +276,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!secretbase:unknown" ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_combined_event(self): @@ -287,7 +294,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!stage:unknown" # yup ) self.assertTrue( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_combined_event_bad_sender(self): @@ -305,7 +312,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!stage:unknown" # yup ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_combined_event_bad_room(self): @@ -323,7 +330,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!piggyshouse:muppets" # nope ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) def test_definition_combined_event_bad_type(self): @@ -341,7 +348,7 @@ class FilteringTestCase(unittest.TestCase): room_id="!stage:unknown" # yup ) self.assertFalse( - self.filter._passes_definition(definition, event) + Filter(definition).check(event) ) @defer.inlineCallbacks @@ -359,7 +366,6 @@ class FilteringTestCase(unittest.TestCase): event = MockEvent( sender="@foo:bar", type="m.profile", - room_id="!foo:bar" ) events = [event] @@ -386,7 +392,6 @@ class FilteringTestCase(unittest.TestCase): event = MockEvent( sender="@foo:bar", type="custom.avatar.3d.crazy", - room_id="!foo:bar" ) events = [event] -- cgit 1.4.1 From ede07434e069d1b143993a3b492428b69a515856 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 21 Oct 2015 09:42:07 +0100 Subject: Use 403 and message to match handlers/auth --- synapse/rest/client/v1/login.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index b2e4cb8eaa..e71cf7e43e 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -102,9 +102,7 @@ class LoginRestServlet(ClientV1RestServlet): login_submission['medium'], login_submission['address'] ) if not user_id: - raise LoginError( - 401, "Unrecognised address", errcode=Codes.UNAUTHORIZED - ) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) else: user_id = login_submission['user'] -- cgit 1.4.1 From 5025ba959f2b91919a13d1c3b014487d68c41ad7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Oct 2015 10:37:04 +0100 Subject: Add config option to disable password login --- synapse/config/cas.py | 3 ++- synapse/config/homeserver.py | 4 +++- synapse/config/password.py | 32 ++++++++++++++++++++++++++++++++ synapse/config/saml2.py | 3 ++- synapse/rest/client/v1/login.py | 8 +++++++- 5 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 synapse/config/password.py (limited to 'synapse/rest/client') diff --git a/synapse/config/cas.py b/synapse/config/cas.py index d268680729..a337ae6ca0 100644 --- a/synapse/config/cas.py +++ b/synapse/config/cas.py @@ -25,7 +25,7 @@ class CasConfig(Config): def read_config(self, config): cas_config = config.get("cas_config", None) if cas_config: - self.cas_enabled = True + self.cas_enabled = cas_config.get("enabled", True) self.cas_server_url = cas_config["server_url"] self.cas_required_attributes = cas_config.get("required_attributes", {}) else: @@ -37,6 +37,7 @@ class CasConfig(Config): return """ # Enable CAS for registration and login. #cas_config: + # enabled: true # server_url: "https://cas-server.com" # #required_attributes: # # name: value diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 3039f3c0bf..4743e6abc5 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -27,12 +27,14 @@ from .appservice import AppServiceConfig from .key import KeyConfig from .saml2 import SAML2Config from .cas import CasConfig +from .password import PasswordConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, - AppServiceConfig, KeyConfig, SAML2Config, CasConfig): + AppServiceConfig, KeyConfig, SAML2Config, CasConfig, + PasswordConfig,): pass diff --git a/synapse/config/password.py b/synapse/config/password.py new file mode 100644 index 0000000000..1a3e278472 --- /dev/null +++ b/synapse/config/password.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class PasswordConfig(Config): + """Password login configuration + """ + + def read_config(self, config): + password_config = config.get("password_config", {}) + self.password_enabled = password_config.get("enabled", True) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Enable password for login. + password_config: + enabled: true + """ diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py index 4c6133cf22..8d7f443021 100644 --- a/synapse/config/saml2.py +++ b/synapse/config/saml2.py @@ -33,7 +33,7 @@ class SAML2Config(Config): def read_config(self, config): saml2_config = config.get("saml2_config", None) if saml2_config: - self.saml2_enabled = True + self.saml2_enabled = saml2_config.get("enabled", True) self.saml2_config_path = saml2_config["config_path"] self.saml2_idp_redirect_url = saml2_config["idp_redirect_url"] else: @@ -49,6 +49,7 @@ class SAML2Config(Config): # the user back to /login/saml2 with proper info. # See pysaml2 docs for format of config. #saml2_config: + # enabled: true # config_path: "%s/sp_conf.py" # idp_redirect_url: "http://%s/idp" """ % (config_dir_path, server_name) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 2e3e4f39f3..00ec8fcd74 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -43,6 +43,7 @@ class LoginRestServlet(ClientV1RestServlet): def __init__(self, hs): super(LoginRestServlet, self).__init__(hs) self.idp_redirect_url = hs.config.saml2_idp_redirect_url + self.password_enabled = hs.config.password_enabled self.saml2_enabled = hs.config.saml2_enabled self.cas_enabled = hs.config.cas_enabled self.cas_server_url = hs.config.cas_server_url @@ -50,11 +51,13 @@ class LoginRestServlet(ClientV1RestServlet): self.servername = hs.config.server_name def on_GET(self, request): - flows = [{"type": LoginRestServlet.PASS_TYPE}] + flows = [] if self.saml2_enabled: flows.append({"type": LoginRestServlet.SAML2_TYPE}) if self.cas_enabled: flows.append({"type": LoginRestServlet.CAS_TYPE}) + if self.password_enabled: + flows.append({"type": LoginRestServlet.PASS_TYPE}) return (200, {"flows": flows}) def on_OPTIONS(self, request): @@ -65,6 +68,9 @@ class LoginRestServlet(ClientV1RestServlet): login_submission = _parse_json(request) try: if login_submission["type"] == LoginRestServlet.PASS_TYPE: + if not self.password_enabled: + raise SynapseError(400, "Password login has been disabled.") + result = yield self.do_password_login(login_submission) defer.returnValue(result) elif self.saml2_enabled and (login_submission["type"] == -- cgit 1.4.1 From c79c4f9b146c2e44a26570756f2ecf2e5a6772dc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 26 Oct 2015 18:47:18 +0000 Subject: Implement full_state incremental sync A hopefully-complete implementation of the full_state incremental sync, as specced at https://github.com/matrix-org/matrix-doc/pull/133. This actually turns out to be a relatively simple modification to the initial sync implementation. --- synapse/handlers/sync.py | 51 ++++++++++++++++++++++++------------ synapse/rest/client/v2_alpha/sync.py | 6 +++-- 2 files changed, 38 insertions(+), 19 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b8e2c81969..8601f22179 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -113,15 +113,20 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() @defer.inlineCallbacks - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. Returns: A Deferred SyncResult. """ - if timeout == 0 or since_token is None: - result = yield self.current_sync_for_user(sync_config, since_token) + + if timeout == 0 or since_token is None or full_state: + # we are going to return immediately, so don't bother calling + # notifier.wait_for_events. + result = yield self.current_sync_for_user(sync_config, since_token, + full_state=full_state) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): @@ -146,19 +151,24 @@ class SyncHandler(BaseHandler): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None): + def current_sync_for_user(self, sync_config, since_token=None, + full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None: - return self.initial_sync(sync_config) + if since_token is None or full_state: + return self.full_state_sync(sync_config, since_token) else: return self.incremental_sync_with_gap(sync_config, since_token) @defer.inlineCallbacks - def initial_sync(self, sync_config): - """Get a sync for a client which is starting without any state + def full_state_sync(self, sync_config, timeline_since_token): + """Get a sync for a client which is starting without any state. + + If a 'message_since_token' is given, only timeline events which have + happened since that token will be returned. + Returns: A Deferred SyncResult. """ @@ -192,8 +202,12 @@ class SyncHandler(BaseHandler): archived = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.initial_sync_for_joined_room( - event.room_id, sync_config, now_token, typing_by_room + room_sync = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + typing_by_room=typing_by_room ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -206,11 +220,12 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync = yield self.initial_sync_for_archived_room( + room_sync = yield self.full_state_sync_for_archived_room( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, leave_token=leave_token, + timeline_since_token=timeline_since_token, ) archived.append(room_sync) @@ -223,15 +238,16 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_joined_room(self, room_id, sync_config, now_token, - typing_by_room): + def full_state_sync_for_joined_room(self, room_id, sync_config, + now_token, timeline_since_token, + typing_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, + room_id, sync_config, now_token, since_token=timeline_since_token ) current_state = yield self.state_handler.get_current_state( @@ -278,15 +294,16 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, typing_by_room)) @defer.inlineCallbacks - def initial_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token): + def full_state_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token, + timeline_since_token): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, + room_id, sync_config, leave_token, since_token=timeline_since_token ) leave_state = yield self.store.get_state_for_events( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 6c4f2b7cd4..1840eef775 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer + RestServlet, parse_string, parse_integer, parse_boolean ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken @@ -90,6 +90,7 @@ class SyncRestServlet(RestServlet): allowed_values=self.ALLOWED_PRESENCE ) filter_id = parse_string(request, "filter", default=None) + full_state = parse_boolean(request, "full_state", default=False) logger.info( "/sync: user=%r, timeout=%r, since=%r," @@ -120,7 +121,8 @@ class SyncRestServlet(RestServlet): try: sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout + sync_config, since_token=since_token, timeout=timeout, + full_state=full_state ) finally: if set_presence == "online": -- cgit 1.4.1 From 5cb298c934adf9c7f42d06ea1ac74ab2bc651c23 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Oct 2015 13:45:56 +0000 Subject: Add room context api --- synapse/handlers/__init__.py | 3 +- synapse/handlers/room.py | 42 ++++++++++++++++ synapse/rest/client/v1/room.py | 36 +++++++++++++ synapse/storage/stream.py | 111 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 190 insertions(+), 2 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 87b4d381c7..6a2339f2eb 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( - RoomCreationHandler, RoomMemberHandler, RoomListHandler + RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler, ) from .message import MessageHandler from .events import EventStreamHandler, EventHandler @@ -70,3 +70,4 @@ class Handlers(object): self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) + self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 60f9fa58b0..dd93a5d04d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -33,6 +33,7 @@ from collections import OrderedDict from unpaddedbase64 import decode_base64 import logging +import math import string logger = logging.getLogger(__name__) @@ -747,6 +748,47 @@ class RoomListHandler(BaseHandler): defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) +class RoomContextHandler(BaseHandler): + @defer.inlineCallbacks + def get_event_context(self, user, room_id, event_id, limit): + before_limit = math.floor(limit/2.) + after_limit = limit - before_limit + + now_token = yield self.hs.get_event_sources().get_current_token() + + results = yield self.store.get_events_around( + room_id, event_id, before_limit, after_limit + ) + + results["events_before"] = yield self._filter_events_for_client( + user.to_string(), results["events_before"] + ) + + results["events_after"] = yield self._filter_events_for_client( + user.to_string(), results["events_after"] + ) + + if results["events_after"]: + last_event_id = results["events_after"][-1].event_id + else: + last_event_id = event_id + + state = yield self.store.get_state_for_events( + [last_event_id], None + ) + results["state"] = state[last_event_id].values() + + results["start"] = now_token.copy_and_replace( + "room_key", results["start"] + ).to_string() + + results["end"] = now_token.copy_and_replace( + "room_key", results["end"] + ).to_string() + + defer.returnValue(results) + + class RoomEventSource(object): def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4cee1c1599..2dcaee86cd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -397,6 +397,41 @@ class RoomTriggerBackfill(ClientV1RestServlet): defer.returnValue((200, res)) +class RoomEventContext(ClientV1RestServlet): + PATTERN = client_path_pattern( + "/rooms/(?P[^/]*)/context/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(RoomEventContext, self).__init__(hs) + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_GET(self, request, room_id, event_id): + user, _ = yield self.auth.get_user_by_req(request) + + limit = int(request.args.get("limit", [10])[0]) + + results = yield self.handlers.room_context_handler.get_event_context( + user, room_id, event_id, limit, + ) + + time_now = self.clock.time_msec() + results["events_before"] = [ + serialize_event(event, time_now) for event in results["events_before"] + ] + results["events_after"] = [ + serialize_event(event, time_now) for event in results["events_after"] + ] + results["state"] = [ + serialize_event(event, time_now) for event in results["state"] + ] + + logger.info("Responding with %r", results) + + defer.returnValue((200, results)) + + # TODO: Needs unit testing class RoomMembershipRestServlet(ClientV1RestServlet): @@ -628,3 +663,4 @@ def register_servlets(hs, http_server): RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + RoomEventContext(hs).register(http_server) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3cab06fdef..f2eecf52f9 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -23,7 +23,7 @@ paginate bacwards. This is implemented by keeping two ordering columns: stream_ordering and topological_ordering. Stream ordering is basically insertion/received order -(except for events from backfill requests). The topolgical_ordering is a +(except for events from backfill requests). The topological_ordering is a weak ordering of events based on the pdu graph. This means that we have to have two different types of tokens, depending on @@ -436,3 +436,112 @@ class StreamStore(SQLBaseStore): internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) + + @defer.inlineCallbacks + def get_events_around(self, room_id, event_id, before_limit, after_limit): + results = yield self.runInteraction( + "get_events_around", self._get_events_around_txn, + room_id, event_id, before_limit, after_limit + ) + + events_before = yield self._get_events( + [e for e in results["before"]["event_ids"]], + get_prev_content=True + ) + + events_after = yield self._get_events( + [e for e in results["after"]["event_ids"]], + get_prev_content=True + ) + + defer.returnValue({ + "events_before": events_before, + "events_after": events_after, + "start": results["before"]["token"], + "end": results["after"]["token"], + }) + + def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit): + results = self._simple_select_one_txn( + txn, + "events", + keyvalues={ + "event_id": event_id, + "room_id": room_id, + }, + retcols=["stream_ordering", "topological_ordering"], + ) + + stream_ordering = results["stream_ordering"] + topological_ordering = results["topological_ordering"] + + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND (topological_ordering < ?" + " OR (topological_ordering = ? AND stream_ordering < ?))" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND (topological_ordering > ?" + " OR (topological_ordering = ? AND stream_ordering > ?))" + " ORDER BY topological_ordering ASC, stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute( + query_before, + ( + room_id, topological_ordering, topological_ordering, + stream_ordering, before_limit, + ) + ) + + rows = self.cursor_to_dict(txn) + events_before = [r["event_id"] for r in rows] + + if rows: + start_token = str(RoomStreamToken( + rows[0]["topological_ordering"], + rows[0]["stream_ordering"] - 1, + )) + else: + start_token = str(RoomStreamToken( + topological_ordering, + stream_ordering - 1, + )) + + txn.execute( + query_after, + ( + room_id, topological_ordering, topological_ordering, + stream_ordering, after_limit, + ) + ) + + rows = self.cursor_to_dict(txn) + events_after = [r["event_id"] for r in rows] + + if rows: + end_token = str(RoomStreamToken( + rows[-1]["topological_ordering"], + rows[-1]["stream_ordering"], + )) + else: + end_token = str(RoomStreamToken( + topological_ordering, + stream_ordering, + )) + + return { + "before": { + "event_ids": events_before, + "token": start_token, + }, + "after": { + "event_ids": events_after, + "token": end_token, + }, + } -- cgit 1.4.1