From 641420c5e0ca074c6873ee800e14a4e881447ec3 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 13 Nov 2015 15:44:57 +0000 Subject: Clean up room initialSync for guest users --- synapse/handlers/message.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 14051aee99..a92409c6a2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -588,23 +588,28 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = {} - if not is_guest: - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) - receipts_handler = self.hs.get_handlers().receipts_handler + @defer.inlineCallbacks + def get_receipts(): + receipts_handler = self.hs.get_handlers().receipts_handler + receipts = yield receipts_handler.get_receipts_for_room( + room_id, + now_token.receipt_key + ) + defer.returnValue(receipts) presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), - receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), + get_receipts(), self.store.get_recent_events_for_room( room_id, limit=limit, -- cgit 1.5.1 From ba26eb3d5d487edb90c21db7efec631b80adf24b Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 17 Nov 2015 17:17:30 -0500 Subject: Allow users to forget rooms --- synapse/api/auth.py | 7 +++++ synapse/handlers/room.py | 3 ++ synapse/rest/client/v1/room.py | 13 ++++++-- synapse/storage/prepare_database.py | 2 +- synapse/storage/roommember.py | 36 ++++++++++++++++++++++ .../schema/delta/26/forgotten_memberships.sql | 24 +++++++++++++++ 6 files changed, 81 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/26/forgotten_memberships.sql (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 8111b34428..6eaa1150a3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -207,6 +207,13 @@ class Auth(object): user_id, room_id )) + if membership == Membership.LEAVE: + forgot = yield self.store.did_forget(user_id, room_id) + if forgot: + raise AuthError(403, "User %s not in room %s" % ( + user_id, room_id + )) + defer.returnValue(member) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3f04752581..023b4001b8 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -743,6 +743,9 @@ class RoomMemberHandler(BaseHandler): ) defer.returnValue((token, public_key, key_validity_url, display_name)) + def forget(self, user, room_id): + self.store.forget(user.to_string(), room_id) + class RoomListHandler(BaseHandler): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 139dac1cc3..6952d269ec 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -448,7 +448,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): def register(self, http_server): # /rooms/$roomid/[invite|join|leave] PATTERN = ("/rooms/(?P[^/]*)/" - "(?Pjoin|invite|leave|ban|kick)") + "(?Pjoin|invite|leave|ban|kick|forget)") register_txn_path(self, PATTERN, http_server) @defer.inlineCallbacks @@ -458,6 +458,8 @@ class RoomMembershipRestServlet(ClientV1RestServlet): allow_guest=True ) + effective_membership_action = membership_action + if is_guest and membership_action not in {Membership.JOIN, Membership.LEAVE}: raise AuthError(403, "Guest access not allowed") @@ -488,11 +490,13 @@ class RoomMembershipRestServlet(ClientV1RestServlet): UserID.from_string(state_key) if membership_action == "kick": - membership_action = "leave" + effective_membership_action = "leave" + elif membership_action == "forget": + effective_membership_action = "leave" msg_handler = self.handlers.message_handler - content = {"membership": unicode(membership_action)} + content = {"membership": unicode(effective_membership_action)} if is_guest: content["kind"] = "guest" @@ -509,6 +513,9 @@ class RoomMembershipRestServlet(ClientV1RestServlet): is_guest=is_guest, ) + if membership_action == "forget": + self.handlers.room_member_handler.forget(user, room_id) + defer.returnValue((200, {})) def _has_3pid_invite_keys(self, content): diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1a74d6e360..9800fd4203 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 25 +SCHEMA_VERSION = 26 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ae1ad56d9a..183855ba40 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -269,3 +269,39 @@ class RoomMemberStore(SQLBaseStore): ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 defer.returnValue(ret) + + def forget(self, user_id, room_id): + def f(txn): + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + self.runInteraction("forget_membership", f) + + @defer.inlineCallbacks + def did_forget(self, user_id, room_id): + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + "FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " forgotten = 1" + ) + txn.execute(sql, (user_id, room_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) + defer.returnValue(count > 0) diff --git a/synapse/storage/schema/delta/26/forgotten_memberships.sql b/synapse/storage/schema/delta/26/forgotten_memberships.sql new file mode 100644 index 0000000000..df55b9c6f6 --- /dev/null +++ b/synapse/storage/schema/delta/26/forgotten_memberships.sql @@ -0,0 +1,24 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Keeps track of what rooms users have left and don't want to be able to + * access again. + * + * If all users on this server have left a room, we can delete the room + * entirely. + */ + + ALTER TABLE room_memberships ADD COLUMN forgotten INTEGER(1) DEFAULT 0; -- cgit 1.5.1 From bed7889703371dca893893d33f67e59e99cda111 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 18 Nov 2015 18:08:22 -0500 Subject: Apply forgetting properly to historical events --- synapse/handlers/_base.py | 10 +++++++++- synapse/storage/roommember.py | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6519f183df..95bb06395a 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -92,7 +92,15 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - membership = membership_event.membership + was_forgotten_at_event = yield self.store.was_forgotten_at( + membership_event.user_id, + membership_event.room_id, + membership_event.event_id + ) + if was_forgotten_at_event: + membership = None + else: + membership = membership_event.membership else: membership = None diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 183855ba40..5e92cdc811 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -271,6 +271,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(ret) def forget(self, user_id, room_id): + """Indicate that user_id wishes to discard history for room_id.""" def f(txn): sql = ( "UPDATE" @@ -287,6 +288,9 @@ class RoomMemberStore(SQLBaseStore): @defer.inlineCallbacks def did_forget(self, user_id, room_id): + """Returns whether user_id has elected to discard history for room_id. + + Returns False if they have since re-joined.""" def f(txn): sql = ( "SELECT" @@ -298,10 +302,36 @@ class RoomMemberStore(SQLBaseStore): " AND" " room_id = ?" " AND" - " forgotten = 1" + " forgotten = 0" ) txn.execute(sql, (user_id, room_id)) rows = txn.fetchall() return rows[0][0] count = yield self.runInteraction("did_forget_membership", f) - defer.returnValue(count > 0) + defer.returnValue(count == 0) + + @defer.inlineCallbacks + def was_forgotten_at(self, user_id, room_id, event_id): + """Returns whether user_id has elected to discard history for room_id at event_id. + + event_id must be a membership event.""" + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + "FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " event_id = ?" + " AND" + " forgotten = 1" + ) + txn.execute(sql, (user_id, room_id, event_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership_at", f) + defer.returnValue(count == 1) -- cgit 1.5.1 From 9da4c5340da7e5a8e03a3bd7e028a1c862ce9616 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 19 Nov 2015 10:07:21 -0500 Subject: Simplify code --- synapse/handlers/_base.py | 2 +- synapse/storage/roommember.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 95bb06395a..5fd20285d2 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -93,7 +93,7 @@ class BaseHandler(object): membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: was_forgotten_at_event = yield self.store.was_forgotten_at( - membership_event.user_id, + membership_event.state_key, membership_event.room_id, membership_event.event_id ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 5e92cdc811..c3e11b91da 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -295,7 +295,7 @@ class RoomMemberStore(SQLBaseStore): sql = ( "SELECT" " COUNT(*)" - "FROM" + " FROM" " room_memberships" " WHERE" " user_id = ?" @@ -318,8 +318,8 @@ class RoomMemberStore(SQLBaseStore): def f(txn): sql = ( "SELECT" - " COUNT(*)" - "FROM" + " forgotten" + " FROM" " room_memberships" " WHERE" " user_id = ?" @@ -327,11 +327,9 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" " AND" " event_id = ?" - " AND" - " forgotten = 1" ) txn.execute(sql, (user_id, room_id, event_id)) rows = txn.fetchall() return rows[0][0] - count = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(count == 1) + forgot = yield self.runInteraction("did_forget_membership_at", f) + defer.returnValue(forgot == 1) -- cgit 1.5.1 From 76936f43ae0f88d4523fe07b7a9ccf8ddb5563ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Nov 2015 16:40:42 +0000 Subject: Return words to highlight in search results --- synapse/handlers/search.py | 19 +++++-- synapse/storage/search.py | 123 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 120 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 50688e51a8..6d2197339e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -139,11 +139,18 @@ class SearchHandler(BaseHandler): # Holds the next_batch for the entire result set if one of those exists global_next_batch = None + highlights = set() + if order_by == "rank": - results = yield self.store.search_msgs( + search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -187,11 +194,16 @@ class SearchHandler(BaseHandler): # But only go around 5 times since otherwise synapse will be sad. while len(room_events) < search_filter.limit() and i < 5: i += 1 - results = yield self.store.search_room( + search_result = yield self.store.search_room( room_id, search_term, keys, search_filter.limit() * 2, pagination_token=pagination_token, ) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + results_map = {r["event"].event_id: r for r in results} rank_map.update({r["event"].event_id: r["rank"] for r in results}) @@ -347,7 +359,8 @@ class SearchHandler(BaseHandler): rooms_cat_res = { "results": results, - "count": len(results) + "count": len(results), + "highlights": list(highlights), } if state_results: diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 380270b009..c6386642df 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -20,6 +20,7 @@ from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging +import re logger = logging.getLogger(__name__) @@ -194,14 +195,21 @@ class SearchStore(BackgroundUpdateStore): for ev in events } - defer.returnValue([ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - } - for r in results - if r["event_id"] in event_map - ]) + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_term, events) + + defer.returnValue({ + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + }) @defer.inlineCallbacks def search_room(self, room_id, search_term, keys, limit, pagination_token=None): @@ -294,14 +302,91 @@ class SearchStore(BackgroundUpdateStore): for ev in events } - defer.returnValue([ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - "pagination_token": "%s,%s" % ( - r["topological_ordering"], r["stream_ordering"] - ), - } - for r in results - if r["event_id"] in event_map - ]) + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_term, events) + + defer.returnValue({ + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + "pagination_token": "%s,%s" % ( + r["topological_ordering"], r["stream_ordering"] + ), + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + }) + + def _find_highlights_in_postgres(self, search_term, events): + """Given a list of events and a search term, return a list of words + that match from the content of the event. + + This is used to give a list of words that clients can match against to + highlight the matching parts. + + Args: + search_term (str) + events (list): A list of events + + Returns: + deferred : A set of strings. + """ + def f(txn): + highlight_words = set() + for event in events: + # As a hack we simply join values of all possible keys. This is + # fine since we're only using them to find possible highlights. + values = [] + for key in ("body", "name", "topic"): + v = event.content.get(key, None) + if v: + values.append(v) + + if not values: + continue + + value = " ".join(values) + + # We need to find some values for StartSel and StopSel that + # aren't in the value so that we can pick results out. + start_sel = "<" + stop_sel = ">" + + while start_sel in value: + start_sel += "<" + while stop_sel in value: + stop_sel += ">" + + query = "SELECT ts_headline(?, plainto_tsquery('english', ?), %s)" % ( + _to_postgres_options({ + "StartSel": start_sel, + "StopSel": stop_sel, + "MaxFragments": "50", + }) + ) + txn.execute(query, (value, search_term,)) + headline, = txn.fetchall()[0] + + # Now we need to pick the possible highlights out of the haedline + # result. + matcher_regex = "%s(.*?)%s" % ( + re.escape(start_sel), + re.escape(stop_sel), + ) + + res = re.findall(matcher_regex, headline) + highlight_words.update([r.lower() for r in res]) + + return highlight_words + + return self.runInteraction("_find_highlights", f) + + +def _to_postgres_options(options_dict): + return "'%s'" % ( + ",".join("%s=%s" % (k, v) for k, v in options_dict.items()), + ) -- cgit 1.5.1 From 4dcaa42b6d1fde87e29eb4f3c0080ea92fcc7fa2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Nov 2015 17:45:31 +0000 Subject: Allow paginating search ordered by recents --- synapse/handlers/search.py | 128 ++++++++++++++++------------------ synapse/storage/events.py | 77 ++++++++++++++++++++ synapse/storage/schema/delta/26/ts.py | 57 +++++++++++++++ synapse/storage/search.py | 41 ++++++----- 4 files changed, 219 insertions(+), 84 deletions(-) create mode 100644 synapse/storage/schema/delta/26/ts.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 6d2197339e..671dbb61b8 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -131,6 +131,17 @@ class SearchHandler(BaseHandler): if batch_group == "room_id": room_ids.intersection_update({batch_group_key}) + if not room_ids: + defer.returnValue({ + "search_categories": { + "room_events": { + "results": {}, + "count": 0, + "highlights": [], + } + } + }) + rank_map = {} # event_id -> rank of event allowed_events = [] room_groups = {} # Holds result of grouping by room, if applicable @@ -178,85 +189,66 @@ class SearchHandler(BaseHandler): s["results"].append(e.event_id) elif order_by == "recent": - # In this case we specifically loop through each room as the given - # limit applies to each room, rather than a global list. - # This is not necessarilly a good idea. - for room_id in room_ids: - room_events = [] - if batch_group == "room_id" and batch_group_key == room_id: - pagination_token = batch_token - else: - pagination_token = None - i = 0 - - # We keep looping and we keep filtering until we reach the limit - # or we run out of things. - # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit() and i < 5: - i += 1 - search_result = yield self.store.search_room( - room_id, search_term, keys, search_filter.limit() * 2, - pagination_token=pagination_token, - ) + room_events = [] + i = 0 + + pagination_token = batch_token + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + search_result = yield self.store.search_rooms( + room_ids, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) - if search_result["highlights"]: - highlights.update(search_result["highlights"]) + if search_result["highlights"]: + highlights.update(search_result["highlights"]) - results = search_result["results"] + results = search_result["results"] - results_map = {r["event"].event_id: r for r in results} + results_map = {r["event"].event_id: r for r in results} - rank_map.update({r["event"].event_id: r["rank"] for r in results}) + rank_map.update({r["event"].event_id: r["rank"] for r in results}) - filtered_events = search_filter.filter([ - r["event"] for r in results - ]) + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) - room_events.extend(events) - room_events = room_events[:search_filter.limit()] + room_events.extend(events) + room_events = room_events[:search_filter.limit()] - if len(results) < search_filter.limit() * 2: - pagination_token = None - break - else: - pagination_token = results[-1]["pagination_token"] - - if room_events: - res = results_map[room_events[-1].event_id] - pagination_token = res["pagination_token"] - - group = room_groups.setdefault(room_id, {}) - if pagination_token: - next_batch = encode_base64("%s\n%s\n%s" % ( - "room_id", room_id, pagination_token - )) - group["next_batch"] = next_batch - - if batch_token: - global_next_batch = next_batch - - group["results"] = [e.event_id for e in room_events] - group["order"] = max( - e.origin_server_ts/1000 for e in room_events - if hasattr(e, "origin_server_ts") - ) + if len(results) < search_filter.limit() * 2: + pagination_token = None + else: + pagination_token = results[-1]["pagination_token"] - allowed_events.extend(room_events) + if room_events: + for event in room_events: + group = room_groups.setdefault(event.room_id, { + "results": [], + }) + group["results"].append(event.event_id) - # Normalize the group orders - if room_groups: - if len(room_groups) > 1: - mx = max(g["order"] for g in room_groups.values()) - mn = min(g["order"] for g in room_groups.values()) + pagination_token = results_map[room_events[-1].event_id]["pagination_token"] - for g in room_groups.values(): - g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) - else: - room_groups.values()[0]["order"] = 1 + if pagination_token: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) + + for room_id, group in room_groups.items(): + group["next_batch"] = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + + allowed_events.extend(room_events) else: # We should never get here due to the guard earlier. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d35ca90b9..7088f2709b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + + def __init__(self, hs): + super(EventsStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + ) + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False, is_new_state=True): @@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore): "processed": True, "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), + "origin_server_ts": int(event.origin_server_ts), } for event, _ in events_and_contexts ], @@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore): ret = yield self.runInteraction("count_messages", _count_messages) defer.returnValue(ret) + + @defer.inlineCallbacks + def _background_reindex_origin_server_ts(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + rows = [] + for event in events: + try: + event_id = event.event_id + origin_server_ts = event.origin_server_ts + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows.append((origin_server_ts, event_id)) + + sql = ( + "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" + ) + + for index in range(0, len(rows), INSERT_CLUMP_SIZE): + clump = rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME) + + defer.returnValue(result) diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/26/ts.py new file mode 100644 index 0000000000..8d4a981975 --- /dev/null +++ b/synapse/storage/schema/delta/26/ts.py @@ -0,0 +1,57 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements + +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = ( + "ALTER TABLE events ADD COLUMN origin_server_ts BIGINT;" + "CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);" +) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_origin_server_ts", progress_json)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c6386642df..20a62d07ff 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -212,11 +212,11 @@ class SearchStore(BackgroundUpdateStore): }) @defer.inlineCallbacks - def search_room(self, room_id, search_term, keys, limit, pagination_token=None): + def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): """Performs a full text search over events with given keys. Args: - room_id (str): The room_id to search in + room_id (list): The room_ids to search in search_term (str): Search term to search for keys (list): List of keys to search in, currently supports "content.body", "content.name", "content.topic" @@ -226,7 +226,15 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [search_term, room_id] + args = [search_term] + + # Make sure we don't explode because the person is in too many rooms. + # We filter the results below regardless. + if len(room_ids) < 500: + clauses.append( + "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) + ) + args.extend(room_ids) local_clauses = [] for key in keys: @@ -239,25 +247,25 @@ class SearchStore(BackgroundUpdateStore): if pagination_token: try: - topo, stream = pagination_token.split(",") - topo = int(topo) + origin_server_ts, stream = pagination_token.split(",") + origin_server_ts = int(origin_server_ts) stream = int(stream) except: raise SynapseError(400, "Invalid pagination token") clauses.append( - "(topological_ordering < ?" - " OR (topological_ordering = ? AND stream_ordering < ?))" + "(origin_server_ts < ?" + " OR (origin_server_ts = ? AND stream_ordering < ?))" ) - args.extend([topo, topo, stream]) + args.extend([origin_server_ts, origin_server_ts, stream]) if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, query) as rank," - " topological_ordering, stream_ordering, room_id, event_id" + " origin_server_ts, stream_ordering, room_id, event_id" " FROM plainto_tsquery('english', ?) as query, event_search" " NATURAL JOIN events" - " WHERE vector @@ query AND room_id = ?" + " WHERE vector @@ query AND " ) elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. @@ -270,24 +278,23 @@ class SearchStore(BackgroundUpdateStore): # MATCH unless it uses the full text search index sql = ( "SELECT rank(matchinfo) as rank, room_id, event_id," - " topological_ordering, stream_ordering" + " origin_server_ts, stream_ordering" " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" " FROM event_search" " WHERE value MATCH ?" " )" " CROSS JOIN events USING (event_id)" - " WHERE room_id = ?" + " WHERE " ) else: # This should be unreachable. raise Exception("Unrecognized database engine") - for clause in clauses: - sql += " AND " + clause + sql += " AND ".join(clauses) # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. - sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" args.append(limit) @@ -295,6 +302,8 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) + results = filter(lambda row: row["room_id"] in room_ids, results) + events = yield self._get_events([r["event_id"] for r in results]) event_map = { @@ -312,7 +321,7 @@ class SearchStore(BackgroundUpdateStore): "event": event_map[r["event_id"]], "rank": r["rank"], "pagination_token": "%s,%s" % ( - r["topological_ordering"], r["stream_ordering"] + r["origin_server_ts"], r["stream_ordering"] ), } for r in results -- cgit 1.5.1 From da7dd586414653a3d7d3ae4225600cb5126059f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Dec 2015 11:06:40 +0000 Subject: Tidy up a bit --- synapse/handlers/search.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 671dbb61b8..df6390cf05 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -226,19 +226,20 @@ class SearchHandler(BaseHandler): if len(results) < search_filter.limit() * 2: pagination_token = None + break else: pagination_token = results[-1]["pagination_token"] - if room_events: - for event in room_events: - group = room_groups.setdefault(event.room_id, { - "results": [], - }) - group["results"].append(event.event_id) + for event in room_events: + group = room_groups.setdefault(event.room_id, { + "results": [], + }) + group["results"].append(event.event_id) - pagination_token = results_map[room_events[-1].event_id]["pagination_token"] + if room_events and len(room_events) >= search_filter.limit(): + last_event_id = room_events[-1].event_id + pagination_token = results_map[last_event_id]["pagination_token"] - if pagination_token: global_next_batch = encode_base64("%s\n%s\n%s" % ( "all", "", pagination_token )) -- cgit 1.5.1 From 71578e2bf255ca434edbafd3919aa56c3ebf1b48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Dec 2015 14:48:35 +0000 Subject: Change the result tict to be a list --- synapse/handlers/search.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index df6390cf05..0e58404148 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -135,7 +135,7 @@ class SearchHandler(BaseHandler): defer.returnValue({ "search_categories": { "room_events": { - "results": {}, + "results": [], "count": 0, "highlights": [], } @@ -339,16 +339,14 @@ class SearchHandler(BaseHandler): # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise the 'age' will be wrong - results = { - e.event_id: { + results = [ + { "rank": rank_map[e.event_id], "result": serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } for e in allowed_events - } - - logger.info("Found %d results", len(results)) + ] rooms_cat_res = { "results": results, -- cgit 1.5.1 From 31069ecf6a9c91e62fcecab9059385e33a19a629 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 15:59:45 +0000 Subject: Rename presence_handler.send_invite to presence_handler.send_presence_invite to distinguish it from normal invites --- synapse/handlers/presence.py | 2 +- synapse/rest/client/v1/presence.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aca65096fc..e95e821c9a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -467,7 +467,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks - def send_invite(self, observer_user, observed_user): + def send_presence_invite(self, observer_user, observed_user): """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 6fe5d19a22..48533f9d60 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -120,7 +120,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if len(u) == 0: continue invited_user = UserID.from_string(u) - yield self.handlers.presence_handler.send_invite( + yield self.handlers.presence_handler.send_presence_invite( observer_user=user, observed_user=invited_user ) -- cgit 1.5.1 From 3d3da2b4609d02bbbb276313fd6b2cc8069e213d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 16:03:08 +0000 Subject: Only fire user_joined_room on the distributor if the user has actually joined the room --- synapse/handlers/room.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 023b4001b8..cb6ac37758 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -517,10 +517,12 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) - user = UserID.from_string(event.user_id) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + user = UserID.from_string(event.user_id) + yield self.distributor.fire( + "user_joined_room", user=user, room_id=room_id + ) @defer.inlineCallbacks def get_inviter(self, event): -- cgit 1.5.1 From 27c5e1b37442e310aa71d997478f8e61bce1672c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Dec 2015 16:36:46 +0000 Subject: Search: Don't disregard grouping info in pagination tokens --- synapse/handlers/search.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index df6390cf05..65ef2f85bf 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -240,9 +240,18 @@ class SearchHandler(BaseHandler): last_event_id = room_events[-1].event_id pagination_token = results_map[last_event_id]["pagination_token"] - global_next_batch = encode_base64("%s\n%s\n%s" % ( - "all", "", pagination_token - )) + # We want to respect the given batch group and group keys so + # that if people blindly use the top level `next_batch` token + # it returns more from the same group (if applicable) rather + # than reverting to searching all results again. + if batch_group and batch_group_key: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + batch_group, batch_group_key, pagination_token + )) + else: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) for room_id, group in room_groups.items(): group["next_batch"] = encode_base64("%s\n%s\n%s" % ( -- cgit 1.5.1 From 95f30ecd1f90cd143c908589b600742148491c15 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 18:41:32 +0000 Subject: Add API for setting account_data globaly or on a per room basis --- synapse/api/filtering.py | 9 +- synapse/handlers/account_data.py | 21 ++- synapse/handlers/message.py | 40 ++++- synapse/handlers/sync.py | 72 ++++++-- synapse/rest/client/v2_alpha/__init__.py | 2 + synapse/rest/client/v2_alpha/account_data.py | 111 ++++++++++++ synapse/rest/client/v2_alpha/sync.py | 6 + synapse/storage/__init__.py | 2 + synapse/storage/account_data.py | 211 +++++++++++++++++++++++ synapse/storage/schema/delta/26/account_data.sql | 23 +++ synapse/storage/tags.py | 4 +- 11 files changed, 476 insertions(+), 25 deletions(-) create mode 100644 synapse/rest/client/v2_alpha/account_data.py create mode 100644 synapse/storage/account_data.py (limited to 'synapse/handlers') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 18f2ec3ae8..19f30c273c 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -50,7 +50,7 @@ class Filtering(object): # many definitions. top_level_definitions = [ - "presence" + "presence", "account_data" ] room_level_definitions = [ @@ -139,6 +139,10 @@ class FilterCollection(object): self.filter_json.get("presence", {}) ) + self.account_data = Filter( + self.filter_json.get("account_data", {}) + ) + def timeline_limit(self): return self.room_timeline_filter.limit() @@ -151,6 +155,9 @@ class FilterCollection(object): def filter_presence(self, events): return self.presence_filter.filter(events) + def filter_account_data(self, events): + return self.account_data.filter(events) + def filter_room_state(self, events): return self.room_state_filter.filter(events) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 1d35d3b7dc..fe773bee9b 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -29,9 +29,10 @@ class AccountDataEventSource(object): last_stream_id = from_key current_stream_id = yield self.store.get_max_account_data_stream_id() - tags = yield self.store.get_updated_tags(user_id, last_stream_id) results = [] + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + for room_id, room_tags in tags.items(): results.append({ "type": "m.tag", @@ -39,6 +40,24 @@ class AccountDataEventSource(object): "room_id": room_id, }) + account_data, room_account_data = ( + yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) + ) + + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + }) + + for room_id, account_data in room_account_data.items(): + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + "room_id": room_id, + }) + defer.returnValue((results, current_stream_id)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 64c57375f7..e959ce50be 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -359,6 +359,10 @@ class MessageHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -436,14 +440,22 @@ class MessageHandler(BaseHandler): for c in current_state.values() ] - account_data = [] + account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - d["account_data"] = account_data + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events except: logger.exception("Failed to get snapshot") @@ -456,9 +468,17 @@ class MessageHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + ret = { "rooms": rooms_ret, "presence": presence, + "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), } @@ -498,14 +518,22 @@ class MessageHandler(BaseHandler): user_id, room_id, pagin_config, membership, member_event_id, is_guest ) - account_data = [] + account_data_events = [] tags = yield self.store.get_tags_for_room(user_id, room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - result["account_data"] = account_data + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 877328b29e..943ce368ef 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -100,6 +100,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. @@ -195,6 +196,12 @@ class SyncHandler(BaseHandler): ) ) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + tags_by_room = yield self.store.get_tags_for_user( sync_config.user.to_string() ) @@ -211,6 +218,7 @@ class SyncHandler(BaseHandler): timeline_since_token=timeline_since_token, ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -230,11 +238,13 @@ class SyncHandler(BaseHandler): leave_token=leave_token, timeline_since_token=timeline_since_token, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -244,7 +254,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -262,19 +273,38 @@ class SyncHandler(BaseHandler): state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) - def account_data_for_room(self, room_id, tags_by_room): - account_data = [] + def account_data_for_user(self, account_data): + account_data_events = [] + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events + + def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): + account_data_events = [] tags = tags_by_room.get(room_id) if tags is not None: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - return account_data + + account_data = account_data_by_room.get(room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): @@ -341,7 +371,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token, tags_by_room): + timeline_since_token, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -358,7 +389,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=leave_state, account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) @@ -415,6 +446,13 @@ class SyncHandler(BaseHandler): since_token.account_data_key, ) + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: @@ -469,7 +507,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -492,14 +530,15 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room + ephemeral_by_room, tags_by_room, account_data_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room + sync_config, leave_event, since_token, tags_by_room, + account_data_by_room ) archived.append(room_sync) @@ -510,6 +549,7 @@ class SyncHandler(BaseHandler): defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -566,7 +606,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -606,7 +647,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) @@ -616,7 +657,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token, tags_by_room): + since_token, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -654,7 +696,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=state_events_delta, account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room + leave_event.room_id, tags_by_room, account_data_by_room ), ) diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index a108132346..d7b59c84d1 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -23,6 +23,7 @@ from . import ( keys, tokenrefresh, tags, + account_data, ) from synapse.http.server import JsonResource @@ -46,3 +47,4 @@ class ClientV2AlphaRestResource(JsonResource): keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) + account_data.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py new file mode 100644 index 0000000000..5b8f454bf1 --- /dev/null +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet +from synapse.api.errors import AuthError, SynapseError + +from twisted.internet import defer + +import logging + +import simplejson as json + +logger = logging.getLogger(__name__) + + +class AccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)/account_data/(?P[^/]*)" + ) + + def __init__(self, hs): + super(AccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + max_id = yield self.store.add_account_data_for_user( + user_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +class RoomAccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)" + "/rooms/(?P[^/]*)" + "/account_data/(?P[^/]*)" + ) + + def __init__(self, hs): + super(RoomAccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + if not isinstance(body, dict): + raise ValueError("Expected a JSON object") + + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + AccountDataServlet(hs).register(http_server) + RoomAccountDataServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 09693bb435..4efe802487 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -144,6 +144,9 @@ class SyncRestServlet(RestServlet): ) response_content = { + "account_data": self.encode_account_data( + sync_result.account_data, filter, time_now + ), "presence": self.encode_presence( sync_result.presence, filter, time_now ), @@ -165,6 +168,9 @@ class SyncRestServlet(RestServlet): formatted.append(event) return {"events": filter.filter_presence(formatted)} + def encode_account_data(self, events, filter, time_now): + return {"events": filter.filter_account_data(events)} + def encode_joined(self, rooms, filter, time_now, token_id): """ Encode the joined rooms in a sync result diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e7443f2838..c46b653f11 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore +from .account_data import AccountDataStore import logging @@ -73,6 +74,7 @@ class DataStore(RoomMemberStore, RoomStore, EndToEndKeyStore, SearchStore, TagsStore, + AccountDataStore, ): def __init__(self, hs): diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py new file mode 100644 index 0000000000..d1829f84e8 --- /dev/null +++ b/synapse/storage/account_data.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class AccountDataStore(SQLBaseStore): + + def get_account_data_for_user(self, user_id): + """Get all the client account_data for a user. + + Args: + user_id(str): The user to get the account_data for. + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_account_data_for_user_txn(txn): + rows = self._simple_select_list_txn( + txn, "account_data", {"user_id": user_id}, + ["account_data_type", "content"] + ) + + global_account_data = { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id}, + ["room_id", "account_data_type", "content"] + ) + + by_room = {} + for row in rows: + room_data = by_room.setdefault(row["room_id"], {}) + room_data[row["account_data_type"]] = json.loads(row["content"]) + + return (global_account_data, by_room) + + return self.runInteraction( + "get_account_data_for_user", get_account_data_for_user_txn + ) + + def get_account_data_for_room(self, user_id, room_id): + """Get all the client account_data for a user for a room. + + Args: + user_id(str): The user to get the account_data for. + room_id(str): The room to get the account_data for. + Returns: + A deferred dict of the room account_data + """ + def get_account_data_for_room_txn(txn): + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id, "room_id": room_id}, + ["account_data_type", "content"] + ) + + return { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + return self.runInteraction( + "get_account_data_for_room", get_account_data_for_room_txn + ) + + def get_updated_account_data_for_user(self, user_id, stream_id): + """Get all the client account_data for a that's changed. + + Args: + user_id(str): The user to get the account_data for. + stream_id(int): The point in the stream since which to get updates + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_updated_account_data_for_user_txn(txn): + sql = ( + "SELECT account_data_type, content FROM account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + global_account_data = { + row[0]: json.loads(row[1]) for row in txn.fetchall() + } + + sql = ( + "SELECT room_id, account_data_type, content FROM room_account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + account_data_by_room = {} + for row in txn.fetchall(): + room_account_data = account_data_by_room.setdefault(row[0], {}) + room_account_data[row[1]] = json.loads(row[2]) + + return (global_account_data, account_data_by_room) + + return self.runInteraction( + "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + ) + + @defer.inlineCallbacks + def add_account_data_to_room(self, user_id, room_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="room_account_data", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_room_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + @defer.inlineCallbacks + def add_account_data_for_user(self, user_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_user_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + def _update_max_stream_id(self, txn, next_id): + """Update the max stream_id + + Args: + txn: The database cursor + next_id(int): The the revision to advance to. + """ + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) diff --git a/synapse/storage/schema/delta/26/account_data.sql b/synapse/storage/schema/delta/26/account_data.sql index 3198a0d29c..48ad9cc6b8 100644 --- a/synapse/storage/schema/delta/26/account_data.sql +++ b/synapse/storage/schema/delta/26/account_data.sql @@ -15,3 +15,26 @@ ALTER TABLE private_user_data_max_stream_id RENAME TO account_data_max_stream_id; + + +CREATE TABLE IF NOT EXISTS account_data( + user_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) +); + + +CREATE TABLE IF NOT EXISTS room_account_data( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) +); + + +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index f6d826cc59..f520f60c6c 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -48,8 +48,8 @@ class TagsStore(SQLBaseStore): Args: user_id(str): The user to get the tags for. Returns: - A deferred dict mapping from room_id strings to lists of tag - strings. + A deferred dict mapping from room_id strings to dicts mapping from + tag strings to tag content. """ deferred = self._simple_select_list( -- cgit 1.5.1 From ed0f79bdc5e507705655fa380394b8f4328f90e1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 19:46:15 +0000 Subject: Only fire user_joined_room if the membership has changed --- synapse/handlers/federation.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c1bce07e31..e5fb1dd3c9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -233,10 +233,15 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id + context = yield self.state_handler.compute_event_context( + event, old_state=state, outlier=event.internal_metadata.is_outlier() ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + user = UserID.from_string(event.state_key) + yield self.distributor.fire( + "user_joined_room", user=user, room_id=event.room_id + ) @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): -- cgit 1.5.1 From a9526831a45403d3da8165f7832cefc61282723c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 1 Dec 2015 20:53:04 +0000 Subject: Wrap calls to distributor.fire in appropriately named functions so that static analysis can work out want is calling what --- synapse/handlers/events.py | 20 +++++++++++++++----- synapse/handlers/federation.py | 19 ++++++++----------- synapse/handlers/message.py | 10 ++++++---- synapse/handlers/presence.py | 18 +++++++++++------- synapse/handlers/profile.py | 28 +++++++++++++++------------- synapse/handlers/register.py | 12 ++++++++---- synapse/handlers/room.py | 24 +++++++++++++++--------- 7 files changed, 78 insertions(+), 53 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0e4c0d4d06..fe300433e6 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -28,6 +28,18 @@ import random logger = logging.getLogger(__name__) +def started_user_eventstream(distributor, user): + return distributor.fire("started_user_eventstream", user) + + +def stopped_user_eventstream(distributor, user): + return distributor.fire("stopped_user_eventstream", user) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -66,7 +78,7 @@ class EventStreamHandler(BaseHandler): except: logger.exception("Failed to cancel event timer") else: - yield self.distributor.fire("started_user_eventstream", user) + yield started_user_eventstream(self.distributor, user) self._streams_per_user[user] += 1 @@ -89,7 +101,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(user, None) - return self.distributor.fire("stopped_user_eventstream", user) + return stopped_user_eventstream(self.distributor, user) logger.debug("Scheduling _later: for %s", user) self._stop_timer_per_user[user] = ( @@ -120,9 +132,7 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) if is_guest: - yield self.distributor.fire( - "user_joined_room", user=auth_user, room_id=room_id - ) + yield user_joined_room(self.distributor, auth_user, room_id) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c1bce07e31..6cb2f73ff4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,10 @@ import logging logger = logging.getLogger(__name__) +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -60,10 +64,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe( - "user_joined_room", - self._on_user_joined - ) + self.distributor.observe("user_joined_room", self.user_joined_room) self.waiting_for_join_list = {} @@ -234,9 +235,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + yield user_joined_room(self.distributor, user, event.room_id) @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): @@ -733,9 +732,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + yield user_joined_room(self.distributor, user, event.room_id) new_pdu = event @@ -1082,7 +1079,7 @@ class FederationHandler(BaseHandler): return self.store.get_min_depth(context) @log_function - def _on_user_joined(self, user, room_id): + def user_joined_room(self, user, room_id): waiters = self.waiting_for_join_list.get( (user.to_string(), room_id), [] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e959ce50be..cb0361ac49 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -31,6 +31,10 @@ import logging logger = logging.getLogger(__name__) +def collect_presencelike_data(distributor, user, content): + return distributor.fire("changed_presencelike_data", user, content) + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -195,10 +199,8 @@ class MessageHandler(BaseHandler): if membership == Membership.JOIN: joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", - joinee, - builder.content + yield collect_presencelike_data( + self.distributor, joinee, builder.content ) if token_id is not None: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e95e821c9a..63d6f30a7b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -62,6 +62,14 @@ def partitionbool(l, func): return ret.get(True, []), ret.get(False, []) +def user_presence_changed(distributor, user, statuscache): + return distributor.fire("user_presence_changed", user, statuscache) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class PresenceHandler(BaseHandler): STATE_LEVELS = { @@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_state( target_user.localpart, state_to_store ) - yield self.distributor.fire( - "collect_presencelike_data", target_user, state - ) + yield collect_presencelike_data(self.distributor, target_user, state) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler): room_ids=room_ids, statuscache=statuscache, ) - yield self.distributor.fire("user_presence_changed", user, statuscache) + yield user_presence_changed(self.distributor, user, statuscache) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap[user].get_state()["last_active"] ) - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) + yield collect_presencelike_data(self.distributor, user, state) if "last_active" in state: state = dict(state) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 799faffe53..576c6f09b4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -28,6 +28,14 @@ import logging logger = logging.getLogger(__name__) +def changed_presencelike_data(distributor, user, state): + return distributor.fire("changed_presencelike_data", user, state) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "displayname": new_displayname, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "displayname": new_displayname, + }) yield self._update_join_states(target_user) @@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "avatar_url": new_avatar_url, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "avatar_url": new_avatar_url, + }) yield self._update_join_states(target_user) @@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler): "membership": Membership.JOIN, } - yield self.distributor.fire( - "collect_presencelike_data", user, content - ) + yield collect_presencelike_data(self.distributor, user, content) msg_handler = self.hs.get_handlers().message_handler try: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 493a087031..5166bc7b62 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -31,6 +31,10 @@ import urllib logger = logging.getLogger(__name__) +def registered_user(distributor, user): + return distributor.fire("registered_user", user) + + class RegistrationHandler(BaseHandler): def __init__(self, hs): @@ -98,7 +102,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) else: # autogen a random user ID attempts = 0 @@ -117,7 +121,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash) - self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except SynapseError: # if user id is taken, just generate another user_id = None @@ -167,7 +171,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - self.distributor.fire("registered_user", user) + registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -215,7 +219,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=None ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except Exception, e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 023b4001b8..38bf2ef711 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -41,6 +41,18 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + +def user_left_room(distributor, user, room_id): + return distributor.fire("user_left_room", user=user, room_id=room_id) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user=user, room_id=room_id) + + class RoomCreationHandler(BaseHandler): PRESETS_DICT = { @@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.JOIN: user = UserID.from_string(event.user_id) - self.distributor.fire( - "user_left_room", user=user, room_id=event.room_id - ) + user_left_room(self.distributor, user, event.room_id) defer.returnValue({"room_id": room_id}) @@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler): raise SynapseError(404, "No known servers") # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", joinee, content - ) + yield collect_presencelike_data(self.distributor, joinee, content) content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ @@ -518,9 +526,7 @@ class RoomMemberHandler(BaseHandler): ) user = UserID.from_string(event.user_id) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=room_id - ) + yield user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_inviter(self, event): -- cgit 1.5.1 From c30cdb0d6881a3f463574043637072b71d1a3ab2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Dec 2015 10:49:35 +0000 Subject: Add comments --- synapse/handlers/federation.py | 3 +++ synapse/handlers/room.py | 3 +++ 2 files changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a03a5f494c..5f3562b5b5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -239,6 +239,9 @@ class FederationHandler(BaseHandler): ) prev_state = context.current_state.get((event.type, event.state_key)) if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally + # joined the room. Don't bother if the user is just + # changing their profile info. user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1e18038e15..116a998c42 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -527,6 +527,9 @@ class RoomMemberHandler(BaseHandler): prev_state = context.current_state.get((event.type, event.state_key)) if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. user = UserID.from_string(event.user_id) yield user_joined_room(self.distributor, user, room_id) -- cgit 1.5.1 From 5eb4d13aaa6c7ed447680b626257d40ed2421123 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Dec 2015 10:50:58 +0000 Subject: Fix typo in collect_presencelike_data --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index cb0361ac49..c972e8cd4c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) def collect_presencelike_data(distributor, user, content): - return distributor.fire("changed_presencelike_data", user, content) + return distributor.fire("collect_presencelike_data", user, content) class MessageHandler(BaseHandler): -- cgit 1.5.1 From 37b2d69bbcdc8df40712799bf438a7c1463b5bc2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Dec 2015 11:36:02 +0000 Subject: Reuse a single http client, rather than creating new ones --- synapse/handlers/identity.py | 14 +++++--------- synapse/push/httppusher.py | 7 +++---- synapse/rest/client/v1/login.py | 7 ++----- 3 files changed, 10 insertions(+), 18 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 2a99921d5f..f1fa562fff 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -20,7 +20,6 @@ from synapse.api.errors import ( CodeMessageException ) from ._base import BaseHandler -from synapse.http.client import SimpleHttpClient from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError @@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + @defer.inlineCallbacks def threepid_from_creds(self, creds): yield run_on_reactor() - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] trustedIdServers = ['matrix.org', 'vector.im'] @@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield http_client.get_json( + data = yield self.http_client.get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" @@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler): def bind_threepid(self, creds, mxid): yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) - http_client = SimpleHttpClient(self.hs) data = None if 'id_server' in creds: @@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), @@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() - http_client = SimpleHttpClient(self.hs) params = { 'email': email, @@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index a02fed57b4..5160775e59 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -14,7 +14,6 @@ # limitations under the License. from synapse.push import Pusher, PusherConfigException -from synapse.http.client import SimpleHttpClient from twisted.internet import defer @@ -46,7 +45,7 @@ class HttpPusher(Pusher): "'url' required in data for HTTP pusher" ) self.url = data['url'] - self.httpCli = SimpleHttpClient(self.hs) + self.http_client = _hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url['url'] @@ -107,7 +106,7 @@ class HttpPusher(Pusher): if not notification_dict: defer.returnValue([]) try: - resp = yield self.httpCli.post_json_get_json(self.url, notification_dict) + resp = yield self.http_client.post_json_get_json(self.url, notification_dict) except: logger.warn("Failed to push %s ", self.url) defer.returnValue(False) @@ -138,7 +137,7 @@ class HttpPusher(Pusher): } } try: - resp = yield self.httpCli.post_json_get_json(self.url, d) + resp = yield self.http_client.post_json_get_json(self.url, d) except: logger.exception("Failed to push %s ", self.url) defer.returnValue(False) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index b0b641e430..ad17900c0d 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, LoginError, Codes -from synapse.http.client import SimpleHttpClient from synapse.types import UserID from base import ClientV1RestServlet, client_path_patterns @@ -51,6 +50,7 @@ class LoginRestServlet(ClientV1RestServlet): self.cas_server_url = hs.config.cas_server_url self.cas_required_attributes = hs.config.cas_required_attributes self.servername = hs.config.server_name + self.http_client = hs.get_simple_http_client() def on_GET(self, request): flows = [] @@ -98,15 +98,12 @@ class LoginRestServlet(ClientV1RestServlet): # TODO Delete this after all CAS clients switch to token login instead elif self.cas_enabled and (login_submission["type"] == LoginRestServlet.CAS_TYPE): - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) uri = "%s/proxyValidate" % (self.cas_server_url,) args = { "ticket": login_submission["ticket"], "service": login_submission["service"] } - body = yield http_client.get_raw(uri, args) + body = yield self.http_client.get_raw(uri, args) result = yield self.do_cas_login(body) defer.returnValue(result) elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE: -- cgit 1.5.1 From c2c70f7daf5ea1b638e7366f57570417155ab7e2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Dec 2015 12:01:24 +0000 Subject: Use the context returned by _handle_new_event --- synapse/handlers/federation.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5f3562b5b5..2855f2d7c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -177,7 +177,7 @@ class FederationHandler(BaseHandler): ) try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( + context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -234,9 +234,6 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - context = yield self.state_handler.compute_event_context( - event, old_state=state, outlier=event.internal_metadata.is_outlier() - ) prev_state = context.current_state.get((event.type, event.state_key)) if not prev_state or prev_state.membership != Membership.JOIN: # Only fire user_joined_room if the user has acutally -- cgit 1.5.1 From 491f3d16dc222113d0a6b0bab75a6aaafee92e0d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Dec 2015 15:50:50 +0000 Subject: Make state updates in the C+S API idempotent --- synapse/handlers/message.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c972e8cd4c..ccdd3d8473 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,6 +26,8 @@ from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler +from canonicaljson import encode_canonical_json + import logging logger = logging.getLogger(__name__) @@ -213,6 +215,16 @@ class MessageHandler(BaseHandler): builder=builder, ) + if event.is_state(): + prev_state = context.current_state.get((event.type, event.state_key)) + if prev_state and event.user_id == prev_state.user_id: + prev_content = encode_canonical_json(prev_state.content) + next_content = encode_canonical_json(event.content) + if prev_content == next_content: + # Duplicate suppression for state updates with same sender + # and content. + defer.returnValue(prev_state) + if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context, is_guest=is_guest) -- cgit 1.5.1 From 526bc33e023e9a588c1b96e500c15baf90c022fb Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 2 Dec 2015 17:27:49 +0000 Subject: Fix implementation of /admin/whois --- synapse/handlers/admin.py | 28 +++++++++++----------------- synapse/rest/client/v1/admin.py | 2 +- 2 files changed, 12 insertions(+), 18 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index d852a18555..5ba3c7039a 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -30,33 +30,27 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def get_whois(self, user): - res = yield self.store.get_user_ip_and_agents(user) - - d = {} - for r in res: - # Note that device_id is always None - device = d.setdefault(r["device_id"], {}) - session = device.setdefault(r["access_token"], []) - session.append({ - "ip": r["ip"], - "user_agent": r["user_agent"], - "last_seen": r["last_seen"], + connections = [] + + sessions = yield self.store.get_user_ip_and_agents(user) + for session in sessions: + connections.append({ + "ip": session["ip"], + "last_seen": session["last_seen"], + "user_agent": session["user_agent"], }) ret = { "user_id": user.to_string(), "devices": [ { - "device_id": k, + "device_id": None, "sessions": [ { - # "access_token": x, TODO (erikj) - "connections": y, + "connections": connections, } - for x, y in v.items() ] - } - for k, v in d.items() + }, ], } diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 0103697889..886199a6da 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class WhoisRestServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/admin/whois/(?P[^/]*)", releases=()) + PATTERNS = client_path_patterns("/admin/whois/(?P[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): -- cgit 1.5.1 From 478b4e3ed444fc58713d62039dee613f9c057a46 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 3 Dec 2015 13:47:50 +0000 Subject: Reuse the captcha client rather than creating a new one for each request --- synapse/handlers/register.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5166bc7b62..a037da0f70 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -42,6 +42,7 @@ class RegistrationHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("registered_user") + self.captch_client = CaptchaServerHttpClient(hs) @defer.inlineCallbacks def check_username(self, localpart): @@ -306,10 +307,7 @@ class RegistrationHandler(BaseHandler): """ Used only by c/s api v1 """ - # TODO: get this from the homeserver rather than creating a new one for - # each request - client = CaptchaServerHttpClient(self.hs) - data = yield client.post_urlencoded_get_raw( + data = yield self.captcha_client.post_urlencoded_get_raw( "http://www.google.com:80/recaptcha/api/verify", args={ 'privatekey': private_key, -- cgit 1.5.1 From edfcb83473fa4af4ecd34884c780774aa5c5184a Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 3 Dec 2015 16:19:21 +0000 Subject: Flatten devices into a dict, not a list --- synapse/handlers/admin.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 5ba3c7039a..04fa58df65 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -42,16 +42,15 @@ class AdminHandler(BaseHandler): ret = { "user_id": user.to_string(), - "devices": [ - { - "device_id": None, + "devices": { + "": { "sessions": [ { "connections": connections, } ] }, - ], + }, } defer.returnValue(ret) -- cgit 1.5.1 From 660dee94afdcb1059cb7074f1428fbfaa8c57465 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 4 Dec 2015 17:32:09 +0000 Subject: Only include the archived rooms if a include_leave flag in set in the filter --- synapse/api/filtering.py | 4 ++++ synapse/handlers/sync.py | 12 ++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 19f30c273c..bc03d6c287 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -143,6 +143,10 @@ class FilterCollection(object): self.filter_json.get("account_data", {}) ) + self.include_leave = self.filter_json.get("room", {}).get( + "include_leave", False + ) + def timeline_limit(self): return self.room_timeline_filter.limit() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 943ce368ef..24c2b2fad6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -186,14 +186,14 @@ class SyncHandler(BaseHandler): pagination_config=pagination_config.get_source_config("presence"), key=None ) + + membership_list = (Membership.INVITE, Membership.JOIN) + if sync_config.filter.include_leave: + membership_list += (Membership.LEAVE, Membership.BAN) + room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=( - Membership.INVITE, - Membership.JOIN, - Membership.LEAVE, - Membership.BAN - ) + membership_list=membership_list ) account_data, account_data_by_room = ( -- cgit 1.5.1 From 99afb4b750f9ba5074f8e7dd79144cf678c668f1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 10 Dec 2015 17:08:21 +0000 Subject: Ensure that the event that gets persisted is the one that was signed --- synapse/handlers/federation.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2855f2d7c3..e7ad48c948 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -596,7 +596,7 @@ class FederationHandler(BaseHandler): handled_events = set() try: - new_event = self._sign_event(event) + event = self._sign_event(event) # Try the host we successfully got a response to /make_join/ # request first. try: @@ -604,7 +604,7 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.replication_layer.send_join(target_hosts, new_event) + ret = yield self.replication_layer.send_join(target_hosts, event) origin = ret["origin"] state = ret["state"] @@ -613,12 +613,12 @@ class FederationHandler(BaseHandler): handled_events.update([s.event_id for s in state]) handled_events.update([a.event_id for a in auth_chain]) - handled_events.add(new_event.event_id) + handled_events.add(event.event_id) logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) - logger.debug("do_invite_join event: %s", new_event) + logger.debug("do_invite_join event: %s", event) try: yield self.store.store_room( @@ -636,14 +636,14 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, event_stream_id, max_stream_id, + event, event_stream_id, max_stream_id, extra_users=[joinee] ) def log_failure(f): logger.warn( "Failed to notify about %s: %s", - new_event.event_id, f.value + event.event_id, f.value ) d.addErrback(log_failure) -- cgit 1.5.1 From 7d6b3133125aef802dad36d120ad23d5e33948bf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 10 Dec 2015 17:49:34 +0000 Subject: Add caches for whether a room has been forgotten by a user --- synapse/handlers/room.py | 2 +- synapse/storage/roommember.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 116a998c42..a72c3fda9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -755,7 +755,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((token, public_key, key_validity_url, display_name)) def forget(self, user, room_id): - self.store.forget(user.to_string(), room_id) + return self.store.forget(user.to_string(), room_id) class RoomListHandler(BaseHandler): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 69398b7c8e..e1777d7afa 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,7 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import Membership from synapse.types import UserID @@ -270,6 +270,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(ret) + @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): @@ -284,9 +285,11 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" ) txn.execute(sql, (user_id, room_id)) - self.runInteraction("forget_membership", f) + yield self.runInteraction("forget_membership", f) + self.was_forgotten_at.invalidate_all() + self.did_forget.invalidate((user_id, room_id)) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=2) def did_forget(self, user_id, room_id): """Returns whether user_id has elected to discard history for room_id. @@ -310,7 +313,7 @@ class RoomMemberStore(SQLBaseStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3) def was_forgotten_at(self, user_id, room_id, event_id): """Returns whether user_id has elected to discard history for room_id at event_id. -- cgit 1.5.1 From d9a5c56930c22b02268f5deca4df84eba345ec2c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Dec 2015 11:40:23 +0000 Subject: Include approximate count of search results --- synapse/handlers/search.py | 8 ++++++- synapse/storage/search.py | 56 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index bc79564287..99ef56871c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -152,11 +152,15 @@ class SearchHandler(BaseHandler): highlights = set() + count = None + if order_by == "rank": search_result = yield self.store.search_msgs( room_ids, search_term, keys ) + count = search_result["count"] + if search_result["highlights"]: highlights.update(search_result["highlights"]) @@ -207,6 +211,8 @@ class SearchHandler(BaseHandler): if search_result["highlights"]: highlights.update(search_result["highlights"]) + count = search_result["count"] + results = search_result["results"] results_map = {r["event"].event_id: r for r in results} @@ -359,7 +365,7 @@ class SearchHandler(BaseHandler): rooms_cat_res = { "results": results, - "count": len(results), + "count": count, "highlights": list(highlights), } diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c39d54a7ca..efd87d99bb 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -162,6 +162,9 @@ class SearchStore(BackgroundUpdateStore): "(%s)" % (" OR ".join(local_clauses),) ) + count_args = args + count_clauses = clauses + if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank," @@ -170,6 +173,12 @@ class SearchStore(BackgroundUpdateStore): " WHERE vector @@ to_tsquery('english', ?)" ) args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?)" + ) + count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" @@ -177,6 +186,12 @@ class SearchStore(BackgroundUpdateStore): " WHERE value MATCH ?" ) args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ? AND " + ) + count_args = [search_term] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -184,6 +199,9 @@ class SearchStore(BackgroundUpdateStore): for clause in clauses: sql += " AND " + clause + for clause in count_clauses: + count_sql += " AND " + clause + # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" @@ -205,6 +223,14 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): highlights = yield self._find_highlights_in_postgres(search_query, events) + count_sql += " GROUP BY room_id" + + count_results = yield self._execute( + "search_rooms_count", self.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + defer.returnValue({ "results": [ { @@ -215,6 +241,7 @@ class SearchStore(BackgroundUpdateStore): if r["event_id"] in event_map ], "highlights": highlights, + "count": count, }) @defer.inlineCallbacks @@ -254,6 +281,9 @@ class SearchStore(BackgroundUpdateStore): "(%s)" % (" OR ".join(local_clauses),) ) + count_args = args + count_clauses = clauses + if pagination_token: try: origin_server_ts, stream = pagination_token.split(",") @@ -276,7 +306,13 @@ class SearchStore(BackgroundUpdateStore): " NATURAL JOIN events" " WHERE vector @@ to_tsquery('english', ?) AND " ) - args = [search_term, search_term] + args + args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?) AND " + ) + count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin @@ -296,12 +332,19 @@ class SearchStore(BackgroundUpdateStore): " CROSS JOIN events USING (event_id)" " WHERE " ) - args = [search_term] + args + args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ? AND " + ) + count_args = [search_term] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") sql += " AND ".join(clauses) + count_sql += " AND ".join(count_clauses) # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. @@ -326,6 +369,14 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): highlights = yield self._find_highlights_in_postgres(search_query, events) + count_sql += " GROUP BY room_id" + + count_results = yield self._execute( + "search_rooms_count", self.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + defer.returnValue({ "results": [ { @@ -339,6 +390,7 @@ class SearchStore(BackgroundUpdateStore): if r["event_id"] in event_map ], "highlights": highlights, + "count": count, }) def _find_highlights_in_postgres(self, search_query, events): -- cgit 1.5.1 From 1ee7280c4c7a6ad99236a10a861fde3cd013892b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 11 Dec 2015 16:48:20 +0000 Subject: Do the /sync in parallel accross the rooms like /initialSync does --- synapse/handlers/sync.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 24c2b2fad6..7088c20cb4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -17,6 +17,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes +from synapse.util import unwrapFirstError from twisted.internet import defer @@ -209,9 +210,10 @@ class SyncHandler(BaseHandler): joined = [] invited = [] archived = [] + deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.full_state_sync_for_joined_room( + room_sync_deferred = self.full_state_sync_for_joined_room( room_id=event.room_id, sync_config=sync_config, now_token=now_token, @@ -220,7 +222,8 @@ class SyncHandler(BaseHandler): tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, ) - joined.append(room_sync) + room_sync_deferred.addCallback(joined.append) + deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: invite = yield self.store.get_event(event.event_id) invited.append(InvitedSyncResult( @@ -231,7 +234,7 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync = yield self.full_state_sync_for_archived_room( + room_sync_deferred = self.full_state_sync_for_archived_room( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, @@ -240,7 +243,12 @@ class SyncHandler(BaseHandler): tags_by_room=tags_by_room, account_data_by_room=account_data_by_room, ) - archived.append(room_sync) + room_sync_deferred.addCallback(archived.append) + deferreds.append(room_sync_deferred) + + yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) defer.returnValue(SyncResult( presence=presence, -- cgit 1.5.1 From dbe7892e03e2e0e6a50c54109c30b22fe4194894 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 14 Dec 2015 15:09:41 +0000 Subject: Fix a race between started/stopped stream --- synapse/handlers/events.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index fe300433e6..576d77e0e7 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -69,7 +69,12 @@ class EventStreamHandler(BaseHandler): A deferred that completes once their presence has been updated. """ if user not in self._streams_per_user: - self._streams_per_user[user] = 0 + # Make sure we set the streams per user to 1 here rather than + # setting it to zero and incrementing the value below. + # Otherwise this may race with stopped_stream causing the + # user to be erased from the map before we have a chance + # to increment it. + self._streams_per_user[user] = 1 if user in self._stop_timer_per_user: try: self.clock.cancel_call_later( @@ -79,8 +84,8 @@ class EventStreamHandler(BaseHandler): logger.exception("Failed to cancel event timer") else: yield started_user_eventstream(self.distributor, user) - - self._streams_per_user[user] += 1 + else: + self._streams_per_user[user] += 1 def stopped_stream(self, user): """If there are no streams for a user this starts a timer that will -- cgit 1.5.1 From 0311612ce9c70d2748cdf2badbd87c854ef5ba8d Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Wed, 16 Dec 2015 13:05:32 +0000 Subject: Give the IS a bunch more 3pid invite context This allows it to form richer emails --- synapse/handlers/room.py | 67 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a72c3fda9f..6a482dacc9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -704,13 +704,48 @@ class RoomMemberHandler(BaseHandler): token_id, txn_id ): + room_state = yield self.hs.get_state_handler().get_current_state(room_id) + + inviter_display_name = "" + inviter_avatar_url = "" + member_event = room_state.get((EventTypes.Member, user.to_string())) + if member_event: + inviter_display_name = member_event.content.get("displayname", "") + inviter_avatar_url = member_event.content.get("avatar_url", "") + + canonical_room_alias = "" + canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, "")) + if canonical_alias_event: + canonical_room_alias = canonical_alias_event.content.get("alias", "") + + room_name = "" + room_name_event = room_state.get((EventTypes.Name, "")) + if room_name_event: + room_name = room_name_event.content.get("name", "") + + room_join_rules = "" + join_rules_event = room_state.get((EventTypes.JoinRules, "")) + if join_rules_event: + room_join_rules = join_rules_event.content.get("join_rule", "") + + room_avatar_url = "" + room_avatar_event = room_state.get((EventTypes.RoomAvatar, "")) + if room_avatar_event: + room_avatar_url = room_avatar_event.content.get("url", "") + token, public_key, key_validity_url, display_name = ( yield self._ask_id_server_for_third_party_invite( - id_server, - medium, - address, - room_id, - user.to_string() + id_server=id_server, + medium=medium, + address=address, + room_id=room_id, + inviter_user_id=user.to_string(), + room_alias=canonical_room_alias, + room_avatar_url=room_avatar_url, + room_join_rules=room_join_rules, + room_name=room_name, + inviter_display_name=inviter_display_name, + inviter_avatar_url=inviter_avatar_url ) ) msg_handler = self.hs.get_handlers().message_handler @@ -732,7 +767,19 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( - self, id_server, medium, address, room_id, sender): + self, + id_server, + medium, + address, + room_id, + inviter_user_id, + room_alias, + room_avatar_url, + room_join_rules, + room_name, + inviter_display_name, + inviter_avatar_url + ): is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, ) @@ -742,7 +789,13 @@ class RoomMemberHandler(BaseHandler): "medium": medium, "address": address, "room_id": room_id, - "sender": sender, + "room_alias": room_alias, + "room_avatar_url": room_avatar_url, + "room_join_rules": room_join_rules, + "room_name": room_name, + "sender": inviter_user_id, + "sender_display_name": inviter_display_name, + "sender_avatar_url": inviter_avatar_url, } ) # TODO: Check for success -- cgit 1.5.1 From 8c5f252edbb0c62663116c6a541ce8691414996a Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 17 Dec 2015 18:09:51 +0100 Subject: Strip address and such out of 3pid invites We're not meant to leak that into the graph --- synapse/api/auth.py | 2 +- synapse/handlers/federation.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index b9c3e6d2c4..adb7d64482 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -778,7 +778,7 @@ class Auth(object): if "third_party_invite" in event.content: key = ( EventTypes.ThirdPartyInvite, - event.content["third_party_invite"]["token"] + event.content["third_party_invite"]["signed"]["token"] ) third_party_invite = current_state.get(key) if third_party_invite: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e7ad48c948..1255241461 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1650,11 +1650,22 @@ class FederationHandler(BaseHandler): sender = invite["sender"] room_id = invite["room_id"] + if "signed" not in invite: + logger.info( + "Discarding received notification of third party invite " + "without signed: %s" % (invite,) + ) + return + + third_party_invite = { + "signed": invite["signed"], + } + event_dict = { "type": EventTypes.Member, "content": { "membership": Membership.INVITE, - "third_party_invite": invite, + "third_party_invite": third_party_invite, }, "room_id": room_id, "sender": sender, -- cgit 1.5.1 From bdacee476d2642753cfa54f5092e56ecb148ff56 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 17 Dec 2015 18:31:20 +0100 Subject: Add display_name to 3pid invite in m.room.member invites --- synapse/handlers/federation.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1255241461..28f2ff68d6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1650,7 +1650,7 @@ class FederationHandler(BaseHandler): sender = invite["sender"] room_id = invite["room_id"] - if "signed" not in invite: + if "signed" not in invite or "token" not in invite["signed"]: logger.info( "Discarding received notification of third party invite " "without signed: %s" % (invite,) @@ -1676,6 +1676,11 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) event, context = yield self._create_new_client_event(builder=builder) + + event, context = yield self.add_display_name_to_third_party_invite( + event_dict, event, context + ) + self.auth.check(event, context.current_state) yield self._validate_keyserver(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler @@ -1697,6 +1702,10 @@ class FederationHandler(BaseHandler): builder=builder, ) + event, context = yield self.add_display_name_to_third_party_invite( + event_dict, event, context + ) + self.auth.check(event, auth_events=context.current_state) yield self._validate_keyserver(event, auth_events=context.current_state) @@ -1706,6 +1715,27 @@ class FederationHandler(BaseHandler): member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context) + @defer.inlineCallbacks + def add_display_name_to_third_party_invite(self, event_dict, event, context): + key = ( + EventTypes.ThirdPartyInvite, + event.content["third_party_invite"]["signed"]["token"] + ) + original_invite = context.current_state.get(key) + if not original_invite: + logger.info( + "Could not find invite event for third_party_invite - " + "discarding: %s" % (event_dict,) + ) + return + + display_name = original_invite.content["display_name"] + event_dict["content"]["third_party_invite"]["display_name"] = display_name + builder = self.event_builder_factory.new(event_dict) + EventValidator().validate_new(builder) + event, context = yield self._create_new_client_event(builder=builder) + defer.returnValue((event, context)) + @defer.inlineCallbacks def _validate_keyserver(self, event, auth_events): token = event.content["third_party_invite"]["signed"]["token"] -- cgit 1.5.1 From 772ad4f71503866842eb9033b220b757ef20e711 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 17 Dec 2015 23:04:20 +0000 Subject: stop generating default identicons. reverts most of 582019f870adbc4a8a8a9ef97b527e0fead77761 and solves vector-web/vector-im#346 --- synapse/handlers/register.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index a037da0f70..8a365c20f9 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -132,25 +132,9 @@ class RegistrationHandler(BaseHandler): raise RegistrationError( 500, "Cannot generate user ID.") - # create a default avatar for the user - # XXX: ideally clients would explicitly specify one, but given they don't - # and we want consistent and pretty identicons for random users, we'll - # do it here. - try: - auth_user = UserID.from_string(user_id) - media_repository = self.hs.get_resource_for_media_repository() - identicon_resource = media_repository.getChildWithDefault("identicon", None) - upload_resource = media_repository.getChildWithDefault("upload", None) - identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320) - content_uri = yield upload_resource.create_content( - "image/png", None, identicon_bytes, len(identicon_bytes), auth_user - ) - profile_handler = self.hs.get_handlers().profile_handler - profile_handler.set_avatar_url( - auth_user, auth_user, ("%s#auto" % (content_uri,)) - ) - except NotImplementedError: - pass # make tests pass without messing around creating default avatars + # We used to generate default identicons here, but nowadays + # we want clients to generate their own as part of their branding + # rather than there being consistent matrix-wide ones, so we don't. defer.returnValue((user_id, token)) -- cgit 1.5.1 From 64374bda5b47e043a5ff3c0af23bd29461596059 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 17 Dec 2015 23:04:53 +0000 Subject: fix indentation level --- synapse/handlers/register.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 8a365c20f9..698e7d4479 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -132,9 +132,9 @@ class RegistrationHandler(BaseHandler): raise RegistrationError( 500, "Cannot generate user ID.") - # We used to generate default identicons here, but nowadays - # we want clients to generate their own as part of their branding - # rather than there being consistent matrix-wide ones, so we don't. + # We used to generate default identicons here, but nowadays + # we want clients to generate their own as part of their branding + # rather than there being consistent matrix-wide ones, so we don't. defer.returnValue((user_id, token)) -- cgit 1.5.1 From ce4999268a06ccc716d1340b0f4c3e88103d7084 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Dec 2015 10:06:56 +0000 Subject: Fix typo that broke registration on the mobile clients --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index a037da0f70..19df5aa852 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -42,7 +42,7 @@ class RegistrationHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("registered_user") - self.captch_client = CaptchaServerHttpClient(hs) + self.captcha_client = CaptchaServerHttpClient(hs) @defer.inlineCallbacks def check_username(self, localpart): -- cgit 1.5.1 From 7f3148865ce46042248d84104cb70b1645cdf86c Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 21 Dec 2015 19:38:04 +0000 Subject: Return room avatar URLs in /publicRooms Spec: https://github.com/matrix-org/matrix-doc/pull/244 Tests: https://github.com/matrix-org/sytest/pull/121 --- synapse/handlers/room.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6a482dacc9..13f66e0df0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -816,7 +816,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): chunk = yield self.store.get_rooms(is_public=True) - results = yield defer.gatherResults( + + room_members = yield defer.gatherResults( [ self.store.get_users_in_room(room["room_id"]) for room in chunk @@ -824,12 +825,30 @@ class RoomListHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) + avatar_urls = yield defer.gatherResults( + [ + self.get_room_avatar_url(room["room_id"]) + for room in chunk + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + for i, room in enumerate(chunk): - room["num_joined_members"] = len(results[i]) + room["num_joined_members"] = len(room_members[i]) + if avatar_urls[i]: + room["avatar_url"] = avatar_urls[i] # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) + @defer.inlineCallbacks + def get_room_avatar_url(self, room_id): + event = yield self.hs.get_state_handler().get_current_state( + room_id, "m.room.avatar" + ) + if event and "url" in event.content: + defer.returnValue(event.content["url"]) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks -- cgit 1.5.1 From 45a9e0ae0c9a4c55d4648802fefe96cc2933304f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 10:25:46 +0000 Subject: Allow guest access if the user provides a list of rooms in the filter --- synapse/api/filtering.py | 12 ++++++++++++ synapse/handlers/sync.py | 1 + synapse/rest/client/v2_alpha/sync.py | 10 +++++++++- 3 files changed, 22 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 35faa53746..8c8c7b642e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -144,6 +144,9 @@ class FilterCollection(object): "include_leave", False ) + def list_rooms(self): + return self.room_filter.list_rooms() + def timeline_limit(self): return self.room_timeline_filter.limit() @@ -176,6 +179,15 @@ class Filter(object): def __init__(self, filter_json): self.filter_json = filter_json + def list_rooms(self): + """The list of room_id strings this filter restricts the output to + or None if the this filter doesn't list the room ids. + """ + if "rooms" in self.filter_json: + return list(set(self.filter_json["rooms"])) + else: + return None + def check(self, event): """Checks whether the filter matches the given event. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7088c20cb4..75ef742328 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", + "is_guest", "filter", ]) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 697df03dda..35a70ffad1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -85,7 +85,9 @@ class SyncRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): - user, token_id, _ = yield self.auth.get_user_by_req(request) + user, token_id, is_guest = yield self.auth.get_user_by_req( + request, allow_guest=True + ) timeout = parse_integer(request, "timeout", default=0) since = parse_string(request, "since") @@ -118,8 +120,14 @@ class SyncRestServlet(RestServlet): except: filter = FilterCollection({}) + if is_guest and filter.list_rooms() is None: + raise SynapseError( + 400, "Guest users must provide a list of rooms in the filter" + ) + sync_config = SyncConfig( user=user, + is_guest=is_guest, filter=filter, ) -- cgit 1.5.1 From c3fff251a99bc512d4f9ec362152bd532b4808ef Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 11:21:03 +0000 Subject: Allow guest access to /sync --- synapse/handlers/sync.py | 144 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 46 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 75ef742328..9c8ea2bbb2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -15,8 +15,8 @@ from ._base import BaseHandler -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes +from synapse.api.errors import AuthError from synapse.util import unwrapFirstError from twisted.internet import defer @@ -118,6 +118,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.presence or self.joined or self.invited ) +GuestRoom = collections.namedtuple("GuestRoom", ("room_id", "membership")) + class SyncHandler(BaseHandler): @@ -136,6 +138,12 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ + if sync_config.is_guest: + for room_id in sync_config.filter.list_rooms(): + world_readable = yield self._is_world_readable(room_id) + if not world_readable: + raise AuthError(403, "Guest access not allowed") + if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. @@ -152,6 +160,17 @@ class SyncHandler(BaseHandler): ) defer.returnValue(result) + @defer.inlineCallbacks + def _is_world_readable(self, room_id): + state = yield self.hs.get_state_handler().get_current_state( + room_id, + EventTypes.RoomHistoryVisibility + ) + if state and "history_visibility" in state.content: + defer.returnValue(state.content["history_visibility"] == "world_readable") + else: + defer.returnValue(False) + def current_sync_for_user(self, sync_config, since_token=None, full_state=False): """Get the sync for client needed to match what the server has now. @@ -175,37 +194,54 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) + if sync_config.is_guest: + room_list = [] + for room_id in sync_config.filter.list_rooms(): + room_list.append(GuestRoom(room_id, Membership.JOIN)) - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) + account_data = {} + account_data_by_room = {} + tags_by_room = {} - membership_list = (Membership.INVITE, Membership.JOIN) - if sync_config.filter.include_leave: - membership_list += (Membership.LEAVE, Membership.BAN) + # TODO: Hook up read receipts + ephemeral_by_room = {} - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) + else: + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token + ) + + membership_list = (Membership.INVITE, Membership.JOIN) + if sync_config.filter.include_leave: + membership_list += (Membership.LEAVE, Membership.BAN) - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=membership_list + ) + + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + tags_by_room = yield self.store.get_tags_for_user( sync_config.user.to_string() ) - ) - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() + presence_stream = self.event_sources.sources["presence"] + + joined_room_ids = [ + room.room_id for room in room_list + if room.membership == Membership.JOIN + ] + + presence, _ = yield presence_stream.get_new_events( + from_key=0, + user=sync_config.user, + room_ids=joined_room_ids, + is_guest=sync_config.is_guest, ) joined = [] @@ -411,8 +447,36 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] + if sync_config.is_guest: + room_ids = sync_config.filter.list_rooms() + + ephemeral_by_room = {} + + tags_by_room = {} + account_data = {} + account_data_by_room = {} + + else: + rooms = yield self.store.get_rooms_for_user( + sync_config.user.to_string() + ) + room_ids = [room.room_id for room in rooms] + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, since_token + ) + + tags_by_room = yield self.store.get_updated_tags( + sync_config.user.to_string(), + since_token.account_data_key, + ) + + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) + ) presence_source = self.event_sources.sources["presence"] presence, presence_key = yield presence_source.get_new_events( @@ -420,15 +484,10 @@ class SyncHandler(BaseHandler): from_key=since_token.presence_key, limit=sync_config.filter.presence_limit(), room_ids=room_ids, - # /sync doesn't support guest access, they can't get to this point in code - is_guest=False, + is_guest=sync_config.is_guest, ) now_token = now_token.copy_and_replace("presence_key", presence_key) - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - rm_handler = self.hs.get_handlers().room_member_handler app_service = yield self.store.get_app_service_by_user_id( sync_config.user.to_string() @@ -448,18 +507,8 @@ class SyncHandler(BaseHandler): from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, - ) - - tags_by_room = yield self.store.get_updated_tags( - sync_config.user.to_string(), - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - sync_config.user.to_string(), - since_token.account_data_key, - ) + room_ids=room_ids if sync_config.is_guest else (), + is_guest=sync_config.is_guest, ) joined = [] @@ -591,7 +640,10 @@ class SyncHandler(BaseHandler): end_key = "s" + room_key.split('-')[-1] loaded_recents = sync_config.filter.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( - sync_config.user.to_string(), loaded_recents, + sync_config.user.to_string(), + loaded_recents, + is_guest=sync_config.is_guest, + require_all_visible_for_guests=False ) loaded_recents.extend(recents) recents = loaded_recents -- cgit 1.5.1 From cdd04f70556c4f098b6975d07e2685b88aac8bf9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 11:59:55 +0000 Subject: Hook up read receipts and typing notifications for guest access --- synapse/handlers/sync.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9c8ea2bbb2..4753166a0c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -203,14 +203,7 @@ class SyncHandler(BaseHandler): account_data_by_room = {} tags_by_room = {} - # TODO: Hook up read receipts - ephemeral_by_room = {} - else: - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - membership_list = (Membership.INVITE, Membership.JOIN) if sync_config.filter.include_leave: membership_list += (Membership.LEAVE, Membership.BAN) @@ -244,6 +237,10 @@ class SyncHandler(BaseHandler): is_guest=sync_config.is_guest, ) + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, joined_room_ids + ) + joined = [] invited = [] archived = [] @@ -352,11 +349,13 @@ class SyncHandler(BaseHandler): return account_data_events @defer.inlineCallbacks - def ephemeral_by_room(self, sync_config, now_token, since_token=None): + def ephemeral_by_room(self, sync_config, now_token, room_ids, + since_token=None): """Get the ephemeral events for each room the user is in Args: sync_config (SyncConfig): The flags, filters and user for the sync. now_token (StreamToken): Where the server is currently up to. + room_ids (list): List of room id strings to get data for. since_token (StreamToken): Where the server was when the client last synced. Returns: @@ -367,9 +366,6 @@ class SyncHandler(BaseHandler): typing_key = since_token.typing_key if since_token else "0" - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( user=sync_config.user, @@ -450,8 +446,6 @@ class SyncHandler(BaseHandler): if sync_config.is_guest: room_ids = sync_config.filter.list_rooms() - ephemeral_by_room = {} - tags_by_room = {} account_data = {} account_data_by_room = {} @@ -478,6 +472,10 @@ class SyncHandler(BaseHandler): ) ) + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token, room_ids, since_token + ) + presence_source = self.event_sources.sources["presence"] presence, presence_key = yield presence_source.get_new_events( user=sync_config.user, -- cgit 1.5.1 From 251aafcccad74aef96def0adbf5d9387e1d19199 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 14:03:24 +0000 Subject: Use a list comprehension --- synapse/handlers/sync.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4753166a0c..38c185cd54 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -195,9 +195,10 @@ class SyncHandler(BaseHandler): now_token = yield self.event_sources.get_current_token() if sync_config.is_guest: - room_list = [] - for room_id in sync_config.filter.list_rooms(): - room_list.append(GuestRoom(room_id, Membership.JOIN)) + room_list = [ + GuestRoom(room_id, Membership.JOIN) + for room_id in sync_config.filter.list_rooms() + ] account_data = {} account_data_by_room = {} -- cgit 1.5.1 From 0ee01383252cafd30ed92e3b655847d07cacee3a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 15:49:32 +0000 Subject: Include the list of bad room ids in the error --- synapse/api/errors.py | 16 ++++++++++++++++ synapse/handlers/sync.py | 10 ++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index d4037b3d55..8bc7b9e6db 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -120,6 +120,22 @@ class AuthError(SynapseError): super(AuthError, self).__init__(*args, **kwargs) +class GuestAccessError(AuthError): + """An error raised when a there is a problem with a guest user accessing + a room""" + + def __init__(self, rooms, *args, **kwargs): + self.rooms = rooms + super(GuestAccessError, self).__init__(*args, **kwargs) + + def error_dict(self): + return cs_error( + self.msg, + self.errcode, + rooms=self.rooms, + ) + + class EventSizeError(SynapseError): """An error raised when an event is too big.""" diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 38c185cd54..feea407ea2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from ._base import BaseHandler from synapse.api.constants import Membership, EventTypes -from synapse.api.errors import AuthError +from synapse.api.errors import GuestAccessError from synapse.util import unwrapFirstError from twisted.internet import defer @@ -139,10 +139,16 @@ class SyncHandler(BaseHandler): """ if sync_config.is_guest: + bad_rooms = [] for room_id in sync_config.filter.list_rooms(): world_readable = yield self._is_world_readable(room_id) if not world_readable: - raise AuthError(403, "Guest access not allowed") + bad_rooms.append(room_id) + + if bad_rooms: + raise GuestAccessError( + bad_rooms, 403, "Guest access not allowed" + ) if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling -- cgit 1.5.1 From 9ac417fa88906d70de6a7c6f94d40fe11fc6d2fa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 18:27:56 +0000 Subject: Add a cache for initialSync responses that expires after 5 minutes --- synapse/handlers/message.py | 24 +++++++++++- synapse/util/caches/snapshot_cache.py | 71 +++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 synapse/util/caches/snapshot_cache.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ccdd3d8473..bef477b31e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,6 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler @@ -45,6 +46,7 @@ class MessageHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() + self.snapshot_cache = SnapshotCache() @defer.inlineCallbacks def get_message(self, msg_id=None, room_id=None, sender_id=None, @@ -326,9 +328,29 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) - @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): + key = ( + user_id, + pagin_config.from_token, + pagin_config.to_token, + pagin_config.direction, + pagin_config.limit, + as_client_event, + include_archived, + ) + now_ms = self.clock.time_msec() + result = self.snapshot_cache.get(now_ms, key) + if result is not None: + return result + + return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( + user_id, pagin_config, as_client_event, include_archived + )) + + @defer.inlineCallbacks + def _snapshot_all_rooms(self, user_id=None, pagin_config=None, + as_client_event=True, include_archived=False): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py new file mode 100644 index 0000000000..b19aca05ab --- /dev/null +++ b/synapse/util/caches/snapshot_cache.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.async import ObservableDeferred + + +class SnapshotCache(object): + + DURATION_MS = 5 * 60 * 1000 # Cache results for 2 minutes. + + def __init__(self): + self.pending_result_cache = {} # Request that haven't finished yet. + self.prev_result_cache = {} # The older requests that have finished. + self.next_result_cache = {} # The newer requests that have finished. + self.time_last_rotated_ms = 0 + + def rotate(self, time_now_ms): + # Rotate once if the cache duration has passed since the last rotation. + if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms += self.DURATION_MS + + # Rotate again if the cache duration has passed twice since the last + # rotation. + if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms = time_now_ms + + def get(self, time_now_ms, key): + self.rotate(time_now_ms) + # This cache is intended to deduplicate requests, so we expect it to be + # missed most of the time. So we just lookup the key in all of the + # dictionaries rather than trying to short circuit the lookup if the + # key is found. + result = self.prev_result_cache.get(key) + result = self.next_result_cache.get(key, result) + result = self.pending_result_cache.get(key, result) + if result is not None: + return result.observe() + + def set(self, time_now_ms, key, deferred): + self.rotate(time_now_ms) + + result = ObservableDeferred(deferred) + + self.pending_result_cache[key] = result + + def shuffle_along(r): + # When the deferred completes we shuffle it along to the first + # generation of the result cache. So that it will eventually + # expire from the rotation of that cache. + self.next_result_cache[key] = result + self.pending_result_cache.pop(key, None) + + result.observe().addBoth(shuffle_along) + + return result.observe() -- cgit 1.5.1 From 517fb9a023733c064dfabdcfdf4ed75bcff3f7bd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 18:53:47 +0000 Subject: Move the doc string to the public facing method --- synapse/handlers/message.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bef477b31e..a1bed9b0dc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -330,6 +330,23 @@ class MessageHandler(BaseHandler): def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): + """Retrieve a snapshot of all rooms the user is invited or has joined. + + This snapshot may include messages for all rooms where the user is + joined, depending on the pagination config. + + Args: + user_id (str): The ID of the user making the request. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config used to determine how many messages *PER ROOM* to return. + as_client_event (bool): True to get events in client-server format. + include_archived (bool): True to get rooms that the user has left + Returns: + A list of dicts with "room_id" and "membership" keys for all rooms + the user is currently invited or joined in on. Rooms where the user + is joined on, may return a "messages" key with messages, depending + on the specified PaginationConfig. + """ key = ( user_id, pagin_config.from_token, @@ -351,23 +368,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): - """Retrieve a snapshot of all rooms the user is invited or has joined. - - This snapshot may include messages for all rooms where the user is - joined, depending on the pagination config. - Args: - user_id (str): The ID of the user making the request. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config used to determine how many messages *PER ROOM* to return. - as_client_event (bool): True to get events in client-server format. - include_archived (bool): True to get rooms that the user has left - Returns: - A list of dicts with "room_id" and "membership" keys for all rooms - the user is currently invited or joined in on. Rooms where the user - is joined on, may return a "messages" key with messages, depending - on the specified PaginationConfig. - """ memberships = [Membership.INVITE, Membership.JOIN] if include_archived: memberships.append(Membership.LEAVE) -- cgit 1.5.1