diff options
Diffstat (limited to 'synapse')
62 files changed, 1001 insertions, 329 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 7ff37edf2c..3e7e26bf60 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.11.0-r2" +__version__ = "0.11.1" diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 18f2ec3ae8..bc03d6c287 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -50,7 +50,7 @@ class Filtering(object): # many definitions. top_level_definitions = [ - "presence" + "presence", "account_data" ] room_level_definitions = [ @@ -139,6 +139,14 @@ class FilterCollection(object): self.filter_json.get("presence", {}) ) + self.account_data = Filter( + self.filter_json.get("account_data", {}) + ) + + self.include_leave = self.filter_json.get("room", {}).get( + "include_leave", False + ) + def timeline_limit(self): return self.room_timeline_filter.limit() @@ -151,6 +159,9 @@ class FilterCollection(object): def filter_presence(self, events): return self.presence_filter.filter(events) + def filter_account_data(self, events): + return self.account_data.filter(events) + def filter_room_state(self, events): return self.room_state_filter.filter(events) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index cd7a52ec07..56bc52e9ca 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -499,13 +499,28 @@ class SynapseRequest(Request): self.start_time = int(time.time() * 1000) def finished_processing(self): + + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + db_txn_count = context.db_txn_count + db_txn_duration = context.db_txn_duration + except: + ru_utime, ru_stime = (0, 0) + db_txn_count, db_txn_duration = (0, 0) + self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + " Processed request: %dms (%dms, %dms) (%dms/%d)" + " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, self.authenticated_entity, int(time.time() * 1000) - self.start_time, + int(ru_utime * 1000), + int(ru_stime * 1000), + int(db_txn_duration * 1000), + int(db_txn_count), self.sentLength, self.code, self.method, diff --git a/synapse/config/server.py b/synapse/config/server.py index 5c2d6bfeab..187edd516b 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -133,6 +133,7 @@ class ServerConfig(Config): # The domain name of the server, with optional explicit port. # This is used by remote servers to connect to this server, # e.g. matrix.org, localhost:8080, etc. + # This is also the last part of your UserID. server_name: "%(server_name)s" # When running as a daemon, the file to store the pid in diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 44cc1ef132..e634b149ba 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -100,22 +100,20 @@ def format_event_raw(d): def format_event_for_client_v1(d): - d["user_id"] = d.pop("sender", None) + d = format_event_for_client_v2(d) + + sender = d.get("sender") + if sender is not None: + d["user_id"] = sender - move_keys = ( + copy_keys = ( "age", "redacted_because", "replaces_state", "prev_content", "invite_room_state", ) - for key in move_keys: + for key in copy_keys: if key in d["unsigned"]: d[key] = d["unsigned"][key] - drop_keys = ( - "auth_events", "prev_events", "hashes", "signatures", "depth", - "unsigned", "origin", "prev_state" - ) - for key in drop_keys: - d.pop(key, None) return d diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 127b4da4f8..6b164fd2d1 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -165,7 +165,7 @@ class BaseFederationServlet(object): if code is None: continue - server.register_path(method, pattern, self._wrap(code)) + server.register_paths(method, (pattern,), self._wrap(code)) class FederationSendServlet(BaseFederationServlet): diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 1d35d3b7dc..fe773bee9b 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -29,9 +29,10 @@ class AccountDataEventSource(object): last_stream_id = from_key current_stream_id = yield self.store.get_max_account_data_stream_id() - tags = yield self.store.get_updated_tags(user_id, last_stream_id) results = [] + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + for room_id, room_tags in tags.items(): results.append({ "type": "m.tag", @@ -39,6 +40,24 @@ class AccountDataEventSource(object): "room_id": room_id, }) + account_data, room_account_data = ( + yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) + ) + + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + }) + + for room_id, account_data in room_account_data.items(): + for account_data_type, content in account_data.items(): + results.append({ + "type": account_data_type, + "content": content, + "room_id": room_id, + }) + defer.returnValue((results, current_stream_id)) @defer.inlineCallbacks diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index d852a18555..04fa58df65 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -30,34 +30,27 @@ class AdminHandler(BaseHandler): @defer.inlineCallbacks def get_whois(self, user): - res = yield self.store.get_user_ip_and_agents(user) - - d = {} - for r in res: - # Note that device_id is always None - device = d.setdefault(r["device_id"], {}) - session = device.setdefault(r["access_token"], []) - session.append({ - "ip": r["ip"], - "user_agent": r["user_agent"], - "last_seen": r["last_seen"], + connections = [] + + sessions = yield self.store.get_user_ip_and_agents(user) + for session in sessions: + connections.append({ + "ip": session["ip"], + "last_seen": session["last_seen"], + "user_agent": session["user_agent"], }) ret = { "user_id": user.to_string(), - "devices": [ - { - "device_id": k, + "devices": { + "": { "sessions": [ { - # "access_token": x, TODO (erikj) - "connections": y, + "connections": connections, } - for x, y in v.items() ] - } - for k, v in d.items() - ], + }, + }, } defer.returnValue(ret) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0e4c0d4d06..fe300433e6 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -28,6 +28,18 @@ import random logger = logging.getLogger(__name__) +def started_user_eventstream(distributor, user): + return distributor.fire("started_user_eventstream", user) + + +def stopped_user_eventstream(distributor, user): + return distributor.fire("stopped_user_eventstream", user) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -66,7 +78,7 @@ class EventStreamHandler(BaseHandler): except: logger.exception("Failed to cancel event timer") else: - yield self.distributor.fire("started_user_eventstream", user) + yield started_user_eventstream(self.distributor, user) self._streams_per_user[user] += 1 @@ -89,7 +101,7 @@ class EventStreamHandler(BaseHandler): self._stop_timer_per_user.pop(user, None) - return self.distributor.fire("stopped_user_eventstream", user) + return stopped_user_eventstream(self.distributor, user) logger.debug("Scheduling _later: for %s", user) self._stop_timer_per_user[user] = ( @@ -120,9 +132,7 @@ class EventStreamHandler(BaseHandler): timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) if is_guest: - yield self.distributor.fire( - "user_joined_room", user=auth_user, room_id=room_id - ) + yield user_joined_room(self.distributor, auth_user, room_id) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c1bce07e31..2855f2d7c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,10 @@ import logging logger = logging.getLogger(__name__) +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user, room_id) + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -60,10 +64,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe( - "user_joined_room", - self._on_user_joined - ) + self.distributor.observe("user_joined_room", self.user_joined_room) self.waiting_for_join_list = {} @@ -176,7 +177,7 @@ class FederationHandler(BaseHandler): ) try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( + context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -233,10 +234,13 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.membership == Membership.JOIN: - user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally + # joined the room. Don't bother if the user is just + # changing their profile info. + user = UserID.from_string(event.state_key) + yield user_joined_room(self.distributor, user, event.room_id) @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): @@ -733,9 +737,7 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=event.room_id - ) + yield user_joined_room(self.distributor, user, event.room_id) new_pdu = event @@ -1082,7 +1084,7 @@ class FederationHandler(BaseHandler): return self.store.get_min_depth(context) @log_function - def _on_user_joined(self, user, room_id): + def user_joined_room(self, user, room_id): waiters = self.waiting_for_join_list.get( (user.to_string(), room_id), [] diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 2a99921d5f..f1fa562fff 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -20,7 +20,6 @@ from synapse.api.errors import ( CodeMessageException ) from ._base import BaseHandler -from synapse.http.client import SimpleHttpClient from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError @@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + @defer.inlineCallbacks def threepid_from_creds(self, creds): yield run_on_reactor() - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) # XXX: make this configurable! # trustedIdServers = ['matrix.org', 'localhost:8090'] trustedIdServers = ['matrix.org', 'vector.im'] @@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield http_client.get_json( + data = yield self.http_client.get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" @@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler): def bind_threepid(self, creds, mxid): yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) - http_client = SimpleHttpClient(self.hs) data = None if 'id_server' in creds: @@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), @@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): yield run_on_reactor() - http_client = SimpleHttpClient(self.hs) params = { 'email': email, @@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield http_client.post_urlencoded_get_json( + data = yield self.http_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 64c57375f7..ccdd3d8473 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,11 +26,17 @@ from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler +from canonicaljson import encode_canonical_json + import logging logger = logging.getLogger(__name__) +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -195,10 +201,8 @@ class MessageHandler(BaseHandler): if membership == Membership.JOIN: joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", - joinee, - builder.content + yield collect_presencelike_data( + self.distributor, joinee, builder.content ) if token_id is not None: @@ -211,6 +215,16 @@ class MessageHandler(BaseHandler): builder=builder, ) + if event.is_state(): + prev_state = context.current_state.get((event.type, event.state_key)) + if prev_state and event.user_id == prev_state.user_id: + prev_content = encode_canonical_json(prev_state.content) + next_content = encode_canonical_json(event.content) + if prev_content == next_content: + # Duplicate suppression for state updates with same sender + # and content. + defer.returnValue(prev_state) + if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler yield member_handler.change_membership(event, context, is_guest=is_guest) @@ -359,6 +373,10 @@ class MessageHandler(BaseHandler): tags_by_room = yield self.store.get_tags_for_user(user_id) + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user(user_id) + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -436,14 +454,22 @@ class MessageHandler(BaseHandler): for c in current_state.values() ] - account_data = [] + account_data_events = [] tags = tags_by_room.get(event.room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - d["account_data"] = account_data + + account_data = account_data_by_room.get(event.room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + d["account_data"] = account_data_events except: logger.exception("Failed to get snapshot") @@ -456,9 +482,17 @@ class MessageHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) + account_data_events = [] + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + ret = { "rooms": rooms_ret, "presence": presence, + "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), } @@ -498,14 +532,22 @@ class MessageHandler(BaseHandler): user_id, room_id, pagin_config, membership, member_event_id, is_guest ) - account_data = [] + account_data_events = [] tags = yield self.store.get_tags_for_room(user_id, room_id) if tags: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - result["account_data"] = account_data + + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + result["account_data"] = account_data_events defer.returnValue(result) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aca65096fc..63d6f30a7b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -62,6 +62,14 @@ def partitionbool(l, func): return ret.get(True, []), ret.get(False, []) +def user_presence_changed(distributor, user, statuscache): + return distributor.fire("user_presence_changed", user, statuscache) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class PresenceHandler(BaseHandler): STATE_LEVELS = { @@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_state( target_user.localpart, state_to_store ) - yield self.distributor.fire( - "collect_presencelike_data", target_user, state - ) + yield collect_presencelike_data(self.distributor, target_user, state) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -467,7 +473,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks - def send_invite(self, observer_user, observed_user): + def send_presence_invite(self, observer_user, observed_user): """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler): room_ids=room_ids, statuscache=statuscache, ) - yield self.distributor.fire("user_presence_changed", user, statuscache) + yield user_presence_changed(self.distributor, user, statuscache) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap[user].get_state()["last_active"] ) - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) + yield collect_presencelike_data(self.distributor, user, state) if "last_active" in state: state = dict(state) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 799faffe53..576c6f09b4 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -28,6 +28,14 @@ import logging logger = logging.getLogger(__name__) +def changed_presencelike_data(distributor, user, state): + return distributor.fire("changed_presencelike_data", user, state) + + +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "displayname": new_displayname, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "displayname": new_displayname, + }) yield self._update_join_states(target_user) @@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield self.distributor.fire( - "changed_presencelike_data", target_user, { - "avatar_url": new_avatar_url, - } - ) + yield changed_presencelike_data(self.distributor, target_user, { + "avatar_url": new_avatar_url, + }) yield self._update_join_states(target_user) @@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler): "membership": Membership.JOIN, } - yield self.distributor.fire( - "collect_presencelike_data", user, content - ) + yield collect_presencelike_data(self.distributor, user, content) msg_handler = self.hs.get_handlers().message_handler try: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 493a087031..a037da0f70 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -31,6 +31,10 @@ import urllib logger = logging.getLogger(__name__) +def registered_user(distributor, user): + return distributor.fire("registered_user", user) + + class RegistrationHandler(BaseHandler): def __init__(self, hs): @@ -38,6 +42,7 @@ class RegistrationHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("registered_user") + self.captch_client = CaptchaServerHttpClient(hs) @defer.inlineCallbacks def check_username(self, localpart): @@ -98,7 +103,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) else: # autogen a random user ID attempts = 0 @@ -117,7 +122,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash) - self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except SynapseError: # if user id is taken, just generate another user_id = None @@ -167,7 +172,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - self.distributor.fire("registered_user", user) + registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -215,7 +220,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=None ) - yield self.distributor.fire("registered_user", user) + yield registered_user(self.distributor, user) except Exception, e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors @@ -302,10 +307,7 @@ class RegistrationHandler(BaseHandler): """ Used only by c/s api v1 """ - # TODO: get this from the homeserver rather than creating a new one for - # each request - client = CaptchaServerHttpClient(self.hs) - data = yield client.post_urlencoded_get_raw( + data = yield self.captcha_client.post_urlencoded_get_raw( "http://www.google.com:80/recaptcha/api/verify", args={ 'privatekey': private_key, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 023b4001b8..116a998c42 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -41,6 +41,18 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" +def collect_presencelike_data(distributor, user, content): + return distributor.fire("collect_presencelike_data", user, content) + + +def user_left_room(distributor, user, room_id): + return distributor.fire("user_left_room", user=user, room_id=room_id) + + +def user_joined_room(distributor, user, room_id): + return distributor.fire("user_joined_room", user=user, room_id=room_id) + + class RoomCreationHandler(BaseHandler): PRESETS_DICT = { @@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.JOIN: user = UserID.from_string(event.user_id) - self.distributor.fire( - "user_left_room", user=user, room_id=event.room_id - ) + user_left_room(self.distributor, user, event.room_id) defer.returnValue({"room_id": room_id}) @@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler): raise SynapseError(404, "No known servers") # If event doesn't include a display name, add one. - yield self.distributor.fire( - "collect_presencelike_data", joinee, content - ) + yield collect_presencelike_data(self.distributor, joinee, content) content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ @@ -517,10 +525,13 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) - user = UserID.from_string(event.user_id) - yield self.distributor.fire( - "user_joined_room", user=user, room_id=room_id - ) + prev_state = context.current_state.get((event.type, event.state_key)) + if not prev_state or prev_state.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + user = UserID.from_string(event.user_id) + yield user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_inviter(self, event): diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 0e58404148..bc79564287 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -240,9 +240,18 @@ class SearchHandler(BaseHandler): last_event_id = room_events[-1].event_id pagination_token = results_map[last_event_id]["pagination_token"] - global_next_batch = encode_base64("%s\n%s\n%s" % ( - "all", "", pagination_token - )) + # We want to respect the given batch group and group keys so + # that if people blindly use the top level `next_batch` token + # it returns more from the same group (if applicable) rather + # than reverting to searching all results again. + if batch_group and batch_group_key: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + batch_group, batch_group_key, pagination_token + )) + else: + global_next_batch = encode_base64("%s\n%s\n%s" % ( + "all", "", pagination_token + )) for room_id, group in room_groups.items(): group["next_batch"] = encode_base64("%s\n%s\n%s" % ( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 877328b29e..24c2b2fad6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -100,6 +100,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. + "account_data", # List of account_data events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. @@ -185,13 +186,19 @@ class SyncHandler(BaseHandler): pagination_config=pagination_config.get_source_config("presence"), key=None ) + + membership_list = (Membership.INVITE, Membership.JOIN) + if sync_config.filter.include_leave: + membership_list += (Membership.LEAVE, Membership.BAN) + room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=( - Membership.INVITE, - Membership.JOIN, - Membership.LEAVE, - Membership.BAN + membership_list=membership_list + ) + + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() ) ) @@ -211,6 +218,7 @@ class SyncHandler(BaseHandler): timeline_since_token=timeline_since_token, ephemeral_by_room=ephemeral_by_room, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -230,11 +238,13 @@ class SyncHandler(BaseHandler): leave_token=leave_token, timeline_since_token=timeline_since_token, tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, ) archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -244,7 +254,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -262,19 +273,38 @@ class SyncHandler(BaseHandler): state=current_state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) - def account_data_for_room(self, room_id, tags_by_room): - account_data = [] + def account_data_for_user(self, account_data): + account_data_events = [] + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events + + def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): + account_data_events = [] tags = tags_by_room.get(room_id) if tags is not None: - account_data.append({ + account_data_events.append({ "type": "m.tag", "content": {"tags": tags}, }) - return account_data + + account_data = account_data_by_room.get(room_id, {}) + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + return account_data_events @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): @@ -341,7 +371,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token, tags_by_room): + timeline_since_token, tags_by_room, + account_data_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -358,7 +389,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=leave_state, account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), )) @@ -415,6 +446,13 @@ class SyncHandler(BaseHandler): since_token.account_data_key, ) + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + sync_config.user.to_string(), + since_token.account_data_key, + ) + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: @@ -469,7 +507,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -492,14 +530,15 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room + ephemeral_by_room, tags_by_room, account_data_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token, tags_by_room + sync_config, leave_event, since_token, tags_by_room, + account_data_by_room ) archived.append(room_sync) @@ -510,6 +549,7 @@ class SyncHandler(BaseHandler): defer.returnValue(SyncResult( presence=presence, + account_data=self.account_data_for_user(account_data), joined=joined, invited=invited, archived=archived, @@ -566,7 +606,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - ephemeral_by_room, tags_by_room): + ephemeral_by_room, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -606,7 +647,7 @@ class SyncHandler(BaseHandler): state=state, ephemeral=ephemeral_by_room.get(room_id, []), account_data=self.account_data_for_room( - room_id, tags_by_room + room_id, tags_by_room, account_data_by_room ), ) @@ -616,7 +657,8 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token, tags_by_room): + since_token, tags_by_room, + account_data_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -654,7 +696,7 @@ class SyncHandler(BaseHandler): timeline=batch, state=state_events_delta, account_data=self.account_data_for_room( - leave_event.room_id, tags_by_room + leave_event.room_id, tags_by_room, account_data_by_room ), ) diff --git a/synapse/http/server.py b/synapse/http/server.py index 50feea6f1c..c44bdfc888 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -53,6 +53,23 @@ response_timer = metrics.register_distribution( labels=["method", "servlet"] ) +response_ru_utime = metrics.register_distribution( + "response_ru_utime", labels=["method", "servlet"] +) + +response_ru_stime = metrics.register_distribution( + "response_ru_stime", labels=["method", "servlet"] +) + +response_db_txn_count = metrics.register_distribution( + "response_db_txn_count", labels=["method", "servlet"] +) + +response_db_txn_duration = metrics.register_distribution( + "response_db_txn_duration", labels=["method", "servlet"] +) + + _next_request_id = 0 @@ -120,7 +137,7 @@ class HttpServer(object): """ Interface for registering callbacks on a HTTP server """ - def register_path(self, method, path_pattern, callback): + def register_paths(self, method, path_patterns, callback): """ Register a callback that gets fired if we receive a http request with the given method for a path that matches the given regex. @@ -129,7 +146,7 @@ class HttpServer(object): Args: method (str): The method to listen to. - path_pattern (str): The regex used to match requests. + path_patterns (list<SRE_Pattern>): The regex used to match requests. callback (function): The function to fire if we receive a matched request. The first argument will be the request object and subsequent arguments will be any matched groups from the regex. @@ -165,10 +182,11 @@ class JsonResource(HttpServer, resource.Resource): self.version_string = hs.version_string self.hs = hs - def register_path(self, method, path_pattern, callback): - self.path_regexs.setdefault(method, []).append( - self._PathEntry(path_pattern, callback) - ) + def register_paths(self, method, path_patterns, callback): + for path_pattern in path_patterns: + self.path_regexs.setdefault(method, []).append( + self._PathEntry(path_pattern, callback) + ) def render(self, request): """ This gets called by twisted every time someone sends us a request. @@ -220,6 +238,21 @@ class JsonResource(HttpServer, resource.Resource): self.clock.time_msec() - start, request.method, servlet_classname ) + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.inc_by(ru_utime, request.method, servlet_classname) + response_ru_stime.inc_by(ru_stime, request.method, servlet_classname) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname + ) + except: + pass + return # Huh. No one wanted to handle that? Fiiiiiine. Send 400. diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 9cda17fcf8..32b6d6cd72 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError import logging - logger = logging.getLogger(__name__) @@ -102,12 +101,13 @@ class RestServlet(object): def register(self, http_server): """ Register this servlet with the given HTTP server. """ - if hasattr(self, "PATTERN"): - pattern = self.PATTERN + if hasattr(self, "PATTERNS"): + patterns = self.PATTERNS for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"): if hasattr(self, "on_%s" % (method,)): method_handler = getattr(self, "on_%s" % (method,)) - http_server.register_path(method, pattern, method_handler) + http_server.register_paths(method, patterns, method_handler) + else: raise NotImplementedError("RestServlet must register something.") diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 1f015a7f2e..7f76382a17 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -247,6 +247,7 @@ def make_base_append_underride_rules(user): }, { 'rule_id': 'global/underride/.m.rule.message', + 'enabled': False, 'conditions': [ { 'kind': 'event_match', diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index a02fed57b4..5160775e59 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -14,7 +14,6 @@ # limitations under the License. from synapse.push import Pusher, PusherConfigException -from synapse.http.client import SimpleHttpClient from twisted.internet import defer @@ -46,7 +45,7 @@ class HttpPusher(Pusher): "'url' required in data for HTTP pusher" ) self.url = data['url'] - self.httpCli = SimpleHttpClient(self.hs) + self.http_client = _hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url['url'] @@ -107,7 +106,7 @@ class HttpPusher(Pusher): if not notification_dict: defer.returnValue([]) try: - resp = yield self.httpCli.post_json_get_json(self.url, notification_dict) + resp = yield self.http_client.post_json_get_json(self.url, notification_dict) except: logger.warn("Failed to push %s ", self.url) defer.returnValue(False) @@ -138,7 +137,7 @@ class HttpPusher(Pusher): } } try: - resp = yield self.httpCli.post_json_get_json(self.url, d) + resp = yield self.http_client.post_json_get_json(self.url, d) except: logger.exception("Failed to push %s ", self.url) defer.returnValue(False) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index bdde43864c..886199a6da 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.types import UserID -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns import logging @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class WhoisRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/admin/whois/(?P<user_id>[^/]*)") + PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index 504a5e432f..6273ce0795 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -27,7 +27,7 @@ import logging logger = logging.getLogger(__name__) -def client_path_pattern(path_regex): +def client_path_patterns(path_regex, releases=(0,), include_in_unstable=True): """Creates a regex compiled client path with the correct client path prefix. @@ -37,7 +37,14 @@ def client_path_pattern(path_regex): Returns: SRE_Pattern """ - return re.compile("^" + CLIENT_PREFIX + path_regex) + patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)] + if include_in_unstable: + unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable") + patterns.append(re.compile("^" + unstable_prefix + path_regex)) + for release in releases: + new_prefix = CLIENT_PREFIX.replace("/api/v1", "/r%d" % release) + patterns.append(re.compile("^" + new_prefix + path_regex)) + return patterns class ClientV1RestServlet(RestServlet): diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 240eedac75..f488e2dd41 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import RoomAlias -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns import simplejson as json import logging @@ -32,7 +32,7 @@ def register_servlets(hs, http_server): class ClientDirectoryServer(ClientV1RestServlet): - PATTERN = client_path_pattern("/directory/room/(?P<room_alias>[^/]*)$") + PATTERNS = client_path_patterns("/directory/room/(?P<room_alias>[^/]*)$") @defer.inlineCallbacks def on_GET(self, request, room_alias): diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 3e1750d1a1..41b97e7d15 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.streams.config import PaginationConfig -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns from synapse.events.utils import serialize_event import logging @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) class EventStreamRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/events$") + PATTERNS = client_path_patterns("/events$") DEFAULT_LONGPOLL_TIME_MS = 30000 @@ -72,7 +72,7 @@ class EventStreamRestServlet(ClientV1RestServlet): # TODO: Unit test gets, with and without auth, with different kinds of events. class EventRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/events/(?P<event_id>[^/]*)$") + PATTERNS = client_path_patterns("/events/(?P<event_id>[^/]*)$") def __init__(self, hs): super(EventRestServlet, self).__init__(hs) diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index 856a70f297..9ad3df8a9f 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -16,12 +16,12 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns # TODO: Needs unit testing class InitialSyncRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/initialSync$") + PATTERNS = client_path_patterns("/initialSync$") @defer.inlineCallbacks def on_GET(self, request): diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 720d6358e7..776e1667c1 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -16,9 +16,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, LoginError, Codes -from synapse.http.client import SimpleHttpClient from synapse.types import UserID -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns import simplejson as json import urllib @@ -36,7 +35,7 @@ logger = logging.getLogger(__name__) class LoginRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/login$") + PATTERNS = client_path_patterns("/login$", releases=(), include_in_unstable=False) PASS_TYPE = "m.login.password" SAML2_TYPE = "m.login.saml2" CAS_TYPE = "m.login.cas" @@ -51,6 +50,7 @@ class LoginRestServlet(ClientV1RestServlet): self.cas_server_url = hs.config.cas_server_url self.cas_required_attributes = hs.config.cas_required_attributes self.servername = hs.config.server_name + self.http_client = hs.get_simple_http_client() def on_GET(self, request): flows = [] @@ -98,15 +98,12 @@ class LoginRestServlet(ClientV1RestServlet): # TODO Delete this after all CAS clients switch to token login instead elif self.cas_enabled and (login_submission["type"] == LoginRestServlet.CAS_TYPE): - # TODO: get this from the homeserver rather than creating a new one for - # each request - http_client = SimpleHttpClient(self.hs) uri = "%s/proxyValidate" % (self.cas_server_url,) args = { "ticket": login_submission["ticket"], "service": login_submission["service"] } - body = yield http_client.get_raw(uri, args) + body = yield self.http_client.get_raw(uri, args) result = yield self.do_cas_login(body) defer.returnValue(result) elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE: @@ -238,7 +235,7 @@ class LoginRestServlet(ClientV1RestServlet): class SAML2RestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/login/saml2") + PATTERNS = client_path_patterns("/login/saml2", releases=()) def __init__(self, hs): super(SAML2RestServlet, self).__init__(hs) @@ -282,7 +279,7 @@ class SAML2RestServlet(ClientV1RestServlet): # TODO Delete this after all CAS clients switch to token login instead class CasRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/login/cas") + PATTERNS = client_path_patterns("/login/cas", releases=()) def __init__(self, hs): super(CasRestServlet, self).__init__(hs) @@ -293,7 +290,7 @@ class CasRestServlet(ClientV1RestServlet): class CasRedirectServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/login/cas/redirect") + PATTERNS = client_path_patterns("/login/cas/redirect", releases=()) def __init__(self, hs): super(CasRedirectServlet, self).__init__(hs) @@ -316,7 +313,7 @@ class CasRedirectServlet(ClientV1RestServlet): class CasTicketServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/login/cas/ticket") + PATTERNS = client_path_patterns("/login/cas/ticket", releases=()) def __init__(self, hs): super(CasTicketServlet, self).__init__(hs) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 6fe5d19a22..e0949fe4bb 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -19,7 +19,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import UserID -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns import simplejson as json import logging @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) class PresenceStatusRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/presence/(?P<user_id>[^/]*)/status") + PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -73,7 +73,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): class PresenceListRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/presence/list/(?P<user_id>[^/]*)") + PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -120,7 +120,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if len(u) == 0: continue invited_user = UserID.from_string(u) - yield self.handlers.presence_handler.send_invite( + yield self.handlers.presence_handler.send_presence_invite( observer_user=user, observed_user=invited_user ) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 3218e47025..e6c6e5d024 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -16,14 +16,14 @@ """ This module contains REST servlets to do with profile: /profile/<paths> """ from twisted.internet import defer -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns from synapse.types import UserID import simplejson as json class ProfileDisplaynameRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/displayname") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/displayname") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -56,7 +56,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): class ProfileAvatarURLRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/avatar_url") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/avatar_url") @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -89,7 +89,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): class ProfileRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)") + PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)") @defer.inlineCallbacks def on_GET(self, request, user_id): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index b0870db1ac..9270bdd079 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import ( SynapseError, Codes, UnrecognizedRequestError, NotFoundError, StoreError ) -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns from synapse.storage.push_rule import ( InconsistentRuleException, RuleNotFoundException ) @@ -31,7 +31,7 @@ import simplejson as json class PushRuleRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/pushrules/.*$") + PATTERNS = client_path_patterns("/pushrules/.*$") SLIGHTLY_PEDANTIC_TRAILING_SLASH_ERROR = ( "Unrecognised request: You probably wanted a trailing slash") @@ -207,7 +207,12 @@ class PushRuleRestServlet(ClientV1RestServlet): def set_rule_attr(self, user_name, spec, val): if spec['attr'] == 'enabled': + if isinstance(val, dict) and "enabled" in val: + val = val["enabled"] if not isinstance(val, bool): + # Legacy fallback + # This should *actually* take a dict, but many clients pass + # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) self.hs.get_datastore().set_push_rule_enabled( diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index a110c0a4f0..d6d1ad528e 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -17,13 +17,16 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.push import PusherConfigException -from .base import ClientV1RestServlet, client_path_pattern +from .base import ClientV1RestServlet, client_path_patterns import simplejson as json +import logging + +logger = logging.getLogger(__name__) class PusherRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/pushers/set$") + PATTERNS = client_path_patterns("/pushers/set$") @defer.inlineCallbacks def on_POST(self, request): @@ -51,6 +54,9 @@ class PusherRestServlet(ClientV1RestServlet): raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM) + logger.debug("set pushkey %s to kind %s", content['pushkey'], content['kind']) + logger.debug("Got pushers request with body: %r", content) + append = False if 'append' in content: append = content['append'] diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index a56834e365..4b02311e05 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns import synapse.util.stringutils as stringutils from synapse.util.async import run_on_reactor @@ -48,7 +48,7 @@ class RegisterRestServlet(ClientV1RestServlet): handler doesn't have a concept of multi-stages or sessions. """ - PATTERN = client_path_pattern("/register$") + PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False) def __init__(self, hs): super(RegisterRestServlet, self).__init__(hs) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6952d269ec..53cc29becb 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -16,7 +16,7 @@ """ This module contains REST servlets to do with rooms: /rooms/<paths> """ from twisted.internet import defer -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns from synapse.api.errors import SynapseError, Codes, AuthError from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership @@ -34,16 +34,16 @@ class RoomCreateRestServlet(ClientV1RestServlet): # No PATTERN; we have custom dispatch rules here def register(self, http_server): - PATTERN = "/createRoom" - register_txn_path(self, PATTERN, http_server) + PATTERNS = "/createRoom" + register_txn_path(self, PATTERNS, http_server) # define CORS for all of /rooms in RoomCreateRestServlet for simplicity - http_server.register_path("OPTIONS", - client_path_pattern("/rooms(?:/.*)?$"), - self.on_OPTIONS) + http_server.register_paths("OPTIONS", + client_path_patterns("/rooms(?:/.*)?$"), + self.on_OPTIONS) # define CORS for /createRoom[/txnid] - http_server.register_path("OPTIONS", - client_path_pattern("/createRoom(?:/.*)?$"), - self.on_OPTIONS) + http_server.register_paths("OPTIONS", + client_path_patterns("/createRoom(?:/.*)?$"), + self.on_OPTIONS) @defer.inlineCallbacks def on_PUT(self, request, txn_id): @@ -103,18 +103,18 @@ class RoomStateEventRestServlet(ClientV1RestServlet): state_key = ("/rooms/(?P<room_id>[^/]*)/state/" "(?P<event_type>[^/]*)/(?P<state_key>[^/]*)$") - http_server.register_path("GET", - client_path_pattern(state_key), - self.on_GET) - http_server.register_path("PUT", - client_path_pattern(state_key), - self.on_PUT) - http_server.register_path("GET", - client_path_pattern(no_state_key), - self.on_GET_no_state_key) - http_server.register_path("PUT", - client_path_pattern(no_state_key), - self.on_PUT_no_state_key) + http_server.register_paths("GET", + client_path_patterns(state_key), + self.on_GET) + http_server.register_paths("PUT", + client_path_patterns(state_key), + self.on_PUT) + http_server.register_paths("GET", + client_path_patterns(no_state_key), + self.on_GET_no_state_key) + http_server.register_paths("PUT", + client_path_patterns(no_state_key), + self.on_PUT_no_state_key) def on_GET_no_state_key(self, request, room_id, event_type): return self.on_GET(request, room_id, event_type, "") @@ -170,8 +170,8 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def register(self, http_server): # /rooms/$roomid/send/$event_type[/$txn_id] - PATTERN = ("/rooms/(?P<room_id>[^/]*)/send/(?P<event_type>[^/]*)") - register_txn_path(self, PATTERN, http_server, with_get=True) + PATTERNS = ("/rooms/(?P<room_id>[^/]*)/send/(?P<event_type>[^/]*)") + register_txn_path(self, PATTERNS, http_server, with_get=True) @defer.inlineCallbacks def on_POST(self, request, room_id, event_type, txn_id=None): @@ -215,8 +215,8 @@ class JoinRoomAliasServlet(ClientV1RestServlet): def register(self, http_server): # /join/$room_identifier[/$txn_id] - PATTERN = ("/join/(?P<room_identifier>[^/]*)") - register_txn_path(self, PATTERN, http_server) + PATTERNS = ("/join/(?P<room_identifier>[^/]*)") + register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks def on_POST(self, request, room_identifier, txn_id=None): @@ -280,7 +280,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): # TODO: Needs unit testing class PublicRoomListRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/publicRooms$") + PATTERNS = client_path_patterns("/publicRooms$") @defer.inlineCallbacks def on_GET(self, request): @@ -291,7 +291,7 @@ class PublicRoomListRestServlet(ClientV1RestServlet): # TODO: Needs unit testing class RoomMemberListRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/members$") + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/members$") @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -328,7 +328,7 @@ class RoomMemberListRestServlet(ClientV1RestServlet): # TODO: Needs better unit testing class RoomMessageListRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/messages$") + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$") @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -351,7 +351,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): # TODO: Needs unit testing class RoomStateRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/state$") + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/state$") @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -368,7 +368,7 @@ class RoomStateRestServlet(ClientV1RestServlet): # TODO: Needs unit testing class RoomInitialSyncRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/initialSync$") + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/initialSync$") @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -383,32 +383,8 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet): defer.returnValue((200, content)) -class RoomTriggerBackfill(ClientV1RestServlet): - PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/backfill$") - - def __init__(self, hs): - super(RoomTriggerBackfill, self).__init__(hs) - self.clock = hs.get_clock() - - @defer.inlineCallbacks - def on_GET(self, request, room_id): - remote_server = urllib.unquote( - request.args["remote"][0] - ).decode("UTF-8") - - limit = int(request.args["limit"][0]) - - handler = self.handlers.federation_handler - events = yield handler.backfill(remote_server, room_id, limit) - - time_now = self.clock.time_msec() - - res = [serialize_event(event, time_now) for event in events] - defer.returnValue((200, res)) - - class RoomEventContext(ClientV1RestServlet): - PATTERN = client_path_pattern( + PATTERNS = client_path_patterns( "/rooms/(?P<room_id>[^/]*)/context/(?P<event_id>[^/]*)$" ) @@ -447,9 +423,9 @@ class RoomMembershipRestServlet(ClientV1RestServlet): def register(self, http_server): # /rooms/$roomid/[invite|join|leave] - PATTERN = ("/rooms/(?P<room_id>[^/]*)/" - "(?P<membership_action>join|invite|leave|ban|kick|forget)") - register_txn_path(self, PATTERN, http_server) + PATTERNS = ("/rooms/(?P<room_id>[^/]*)/" + "(?P<membership_action>join|invite|leave|ban|kick|forget)") + register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks def on_POST(self, request, room_id, membership_action, txn_id=None): @@ -543,8 +519,8 @@ class RoomMembershipRestServlet(ClientV1RestServlet): class RoomRedactEventRestServlet(ClientV1RestServlet): def register(self, http_server): - PATTERN = ("/rooms/(?P<room_id>[^/]*)/redact/(?P<event_id>[^/]*)") - register_txn_path(self, PATTERN, http_server) + PATTERNS = ("/rooms/(?P<room_id>[^/]*)/redact/(?P<event_id>[^/]*)") + register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks def on_POST(self, request, room_id, event_id, txn_id=None): @@ -582,7 +558,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): class RoomTypingRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern( + PATTERNS = client_path_patterns( "/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$" ) @@ -615,7 +591,7 @@ class RoomTypingRestServlet(ClientV1RestServlet): class SearchRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern( + PATTERNS = client_path_patterns( "/search$" ) @@ -655,20 +631,20 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False): http_server : The http_server to register paths with. with_get: True to also register respective GET paths for the PUTs. """ - http_server.register_path( + http_server.register_paths( "POST", - client_path_pattern(regex_string + "$"), + client_path_patterns(regex_string + "$"), servlet.on_POST ) - http_server.register_path( + http_server.register_paths( "PUT", - client_path_pattern(regex_string + "/(?P<txn_id>[^/]*)$"), + client_path_patterns(regex_string + "/(?P<txn_id>[^/]*)$"), servlet.on_PUT ) if with_get: - http_server.register_path( + http_server.register_paths( "GET", - client_path_pattern(regex_string + "/(?P<txn_id>[^/]*)$"), + client_path_patterns(regex_string + "/(?P<txn_id>[^/]*)$"), servlet.on_GET ) @@ -679,7 +655,6 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) - RoomTriggerBackfill(hs).register(http_server) RoomMembershipRestServlet(hs).register(http_server) RoomSendEventRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index eb7c57cade..1567a03c89 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from base import ClientV1RestServlet, client_path_pattern +from base import ClientV1RestServlet, client_path_patterns import hmac @@ -24,7 +24,7 @@ import base64 class VoipRestServlet(ClientV1RestServlet): - PATTERN = client_path_pattern("/voip/turnServer$") + PATTERNS = client_path_patterns("/voip/turnServer$") @defer.inlineCallbacks def on_GET(self, request): diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index a108132346..d7b59c84d1 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -23,6 +23,7 @@ from . import ( keys, tokenrefresh, tags, + account_data, ) from synapse.http.server import JsonResource @@ -46,3 +47,4 @@ class ClientV2AlphaRestResource(JsonResource): keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) + account_data.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 4540e8dcf7..7b8b879c03 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -27,7 +27,7 @@ import simplejson logger = logging.getLogger(__name__) -def client_v2_pattern(path_regex): +def client_v2_patterns(path_regex, releases=(0,)): """Creates a regex compiled client path with the correct client path prefix. @@ -37,7 +37,13 @@ def client_v2_pattern(path_regex): Returns: SRE_Pattern """ - return re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex) + patterns = [re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)] + unstable_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/unstable") + patterns.append(re.compile("^" + unstable_prefix + path_regex)) + for release in releases: + new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) + patterns.append(re.compile("^" + new_prefix + path_regex)) + return patterns def parse_request_allow_empty(request): diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 1970ad3458..3e1459d5b9 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -20,7 +20,7 @@ from synapse.api.errors import LoginError, SynapseError, Codes from synapse.http.servlet import RestServlet from synapse.util.async import run_on_reactor -from ._base import client_v2_pattern, parse_json_dict_from_request +from ._base import client_v2_patterns, parse_json_dict_from_request import logging @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) class PasswordRestServlet(RestServlet): - PATTERN = client_v2_pattern("/account/password") + PATTERNS = client_v2_patterns("/account/password") def __init__(self, hs): super(PasswordRestServlet, self).__init__() @@ -89,7 +89,7 @@ class PasswordRestServlet(RestServlet): class ThreepidRestServlet(RestServlet): - PATTERN = client_v2_pattern("/account/3pid") + PATTERNS = client_v2_patterns("/account/3pid") def __init__(self, hs): super(ThreepidRestServlet, self).__init__() diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py new file mode 100644 index 0000000000..5b8f454bf1 --- /dev/null +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet +from synapse.api.errors import AuthError, SynapseError + +from twisted.internet import defer + +import logging + +import simplejson as json + +logger = logging.getLogger(__name__) + + +class AccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P<user_id>[^/]*)/account_data/(?P<account_data_type>[^/]*)" + ) + + def __init__(self, hs): + super(AccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + max_id = yield self.store.add_account_data_for_user( + user_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +class RoomAccountDataServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/account_data/{account_dataType} HTTP/1.1 + """ + PATTERNS = client_v2_patterns( + "/user/(?P<user_id>[^/]*)" + "/rooms/(?P<room_id>[^/]*)" + "/account_data/(?P<account_data_type>[^/]*)" + ) + + def __init__(self, hs): + super(RoomAccountDataServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def on_PUT(self, request, user_id, room_id, account_data_type): + auth_user, _, _ = yield self.auth.get_user_by_req(request) + if user_id != auth_user.to_string(): + raise AuthError(403, "Cannot add account data for other users.") + + try: + content_bytes = request.content.read() + body = json.loads(content_bytes) + except: + raise SynapseError(400, "Invalid JSON") + + if not isinstance(body, dict): + raise ValueError("Expected a JSON object") + + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, account_data_type, body + ) + + yield self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + AccountDataServlet(hs).register(http_server) + RoomAccountDataServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 4c726f05f5..fb5947a141 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX from synapse.http.servlet import RestServlet -from ._base import client_v2_pattern +from ._base import client_v2_patterns import logging @@ -97,7 +97,7 @@ class AuthRestServlet(RestServlet): cannot be handled in the normal flow (with requests to the same endpoint). Current use is for web fallback auth. """ - PATTERN = client_v2_pattern("/auth/(?P<stagetype>[\w\.]*)/fallback/web") + PATTERNS = client_v2_patterns("/auth/(?P<stagetype>[\w\.]*)/fallback/web") def __init__(self, hs): super(AuthRestServlet, self).__init__() diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index 97956a4b91..3cd0364b56 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -19,7 +19,7 @@ from synapse.api.errors import AuthError, SynapseError from synapse.http.servlet import RestServlet from synapse.types import UserID -from ._base import client_v2_pattern +from ._base import client_v2_patterns import simplejson as json import logging @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) class GetFilterRestServlet(RestServlet): - PATTERN = client_v2_pattern("/user/(?P<user_id>[^/]*)/filter/(?P<filter_id>[^/]*)") + PATTERNS = client_v2_patterns("/user/(?P<user_id>[^/]*)/filter/(?P<filter_id>[^/]*)") def __init__(self, hs): super(GetFilterRestServlet, self).__init__() @@ -65,7 +65,7 @@ class GetFilterRestServlet(RestServlet): class CreateFilterRestServlet(RestServlet): - PATTERN = client_v2_pattern("/user/(?P<user_id>[^/]*)/filter") + PATTERNS = client_v2_patterns("/user/(?P<user_id>[^/]*)/filter") def __init__(self, hs): super(CreateFilterRestServlet, self).__init__() diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 820d33336f..753f2988a1 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -21,7 +21,7 @@ from synapse.types import UserID from canonicaljson import encode_canonical_json -from ._base import client_v2_pattern +from ._base import client_v2_patterns import simplejson as json import logging @@ -54,7 +54,7 @@ class KeyUploadServlet(RestServlet): }, } """ - PATTERN = client_v2_pattern("/keys/upload/(?P<device_id>[^/]*)") + PATTERNS = client_v2_patterns("/keys/upload/(?P<device_id>[^/]*)", releases=()) def __init__(self, hs): super(KeyUploadServlet, self).__init__() @@ -154,12 +154,13 @@ class KeyQueryServlet(RestServlet): } } } } } } """ - PATTERN = client_v2_pattern( + PATTERNS = client_v2_patterns( "/keys/query(?:" "/(?P<user_id>[^/]*)(?:" "/(?P<device_id>[^/]*)" ")?" - ")?" + ")?", + releases=() ) def __init__(self, hs): @@ -245,10 +246,11 @@ class OneTimeKeyServlet(RestServlet): } } } } """ - PATTERN = client_v2_pattern( + PATTERNS = client_v2_patterns( "/keys/claim(?:/?|(?:/" "(?P<user_id>[^/]*)/(?P<device_id>[^/]*)/(?P<algorithm>[^/]*)" - ")?)" + ")?)", + releases=() ) def __init__(self, hs): diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 788acd4adb..aa214e13b6 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.http.servlet import RestServlet -from ._base import client_v2_pattern +from ._base import client_v2_patterns import logging @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class ReceiptRestServlet(RestServlet): - PATTERN = client_v2_pattern( + PATTERNS = client_v2_patterns( "/rooms/(?P<room_id>[^/]*)" "/receipt/(?P<receipt_type>[^/]*)" "/(?P<event_id>[^/]*)$" diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index f899376311..b2b89652c6 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -19,7 +19,7 @@ from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet -from ._base import client_v2_pattern, parse_json_dict_from_request +from ._base import client_v2_patterns, parse_json_dict_from_request import logging import hmac @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) class RegisterRestServlet(RestServlet): - PATTERN = client_v2_pattern("/register") + PATTERNS = client_v2_patterns("/register") def __init__(self, hs): super(RegisterRestServlet, self).__init__() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 775f49885b..4efe802487 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -25,7 +25,7 @@ from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_room_id, ) from synapse.api.filtering import FilterCollection -from ._base import client_v2_pattern +from ._base import client_v2_patterns import copy import logging @@ -69,7 +69,7 @@ class SyncRestServlet(RestServlet): } """ - PATTERN = client_v2_pattern("/sync$") + PATTERNS = client_v2_patterns("/sync$") ALLOWED_PRESENCE = set(["online", "offline"]) def __init__(self, hs): @@ -144,6 +144,9 @@ class SyncRestServlet(RestServlet): ) response_content = { + "account_data": self.encode_account_data( + sync_result.account_data, filter, time_now + ), "presence": self.encode_presence( sync_result.presence, filter, time_now ), @@ -165,6 +168,9 @@ class SyncRestServlet(RestServlet): formatted.append(event) return {"events": filter.filter_presence(formatted)} + def encode_account_data(self, events, filter, time_now): + return {"events": filter.filter_account_data(events)} + def encode_joined(self, rooms, filter, time_now, token_id): """ Encode the joined rooms in a sync result diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index ba7223be11..b5d0db5569 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import client_v2_pattern +from ._base import client_v2_patterns from synapse.http.servlet import RestServlet from synapse.api.errors import AuthError, SynapseError @@ -31,7 +31,7 @@ class TagListServlet(RestServlet): """ GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1 """ - PATTERN = client_v2_pattern( + PATTERNS = client_v2_patterns( "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags" ) @@ -56,7 +56,7 @@ class TagServlet(RestServlet): PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1 """ - PATTERN = client_v2_pattern( + PATTERNS = client_v2_patterns( "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags/(?P<tag>[^/]*)" ) diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py index 901e777983..5a63afd51e 100644 --- a/synapse/rest/client/v2_alpha/tokenrefresh.py +++ b/synapse/rest/client/v2_alpha/tokenrefresh.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.http.servlet import RestServlet -from ._base import client_v2_pattern, parse_json_dict_from_request +from ._base import client_v2_patterns, parse_json_dict_from_request class TokenRefreshRestServlet(RestServlet): @@ -26,7 +26,7 @@ class TokenRefreshRestServlet(RestServlet): Exchanges refresh tokens for a pair of an access token and a new refresh token. """ - PATTERN = client_v2_pattern("/tokenrefresh") + PATTERNS = client_v2_patterns("/tokenrefresh") def __init__(self, hs): super(TokenRefreshRestServlet, self).__init__() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e7443f2838..c46b653f11 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -42,6 +42,7 @@ from .end_to_end_keys import EndToEndKeyStore from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore +from .account_data import AccountDataStore import logging @@ -73,6 +74,7 @@ class DataStore(RoomMemberStore, RoomStore, EndToEndKeyStore, SearchStore, TagsStore, + AccountDataStore, ): def __init__(self, hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 218e708054..17a14e001c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -214,7 +214,8 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): + def _new_transaction(self, conn, desc, after_callbacks, logging_context, + func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -277,6 +278,9 @@ class SQLBaseStore(object): end = time.time() * 1000 duration = end - start + if logging_context is not None: + logging_context.add_database_transaction(duration) + transaction_logger.debug("[TXN END] {%s} %f", name, duration) self._current_txn_total_time += duration @@ -302,7 +306,8 @@ class SQLBaseStore(object): current_context.copy_to(context) return self._new_transaction( - conn, desc, after_callbacks, func, *args, **kwargs + conn, desc, after_callbacks, current_context, + func, *args, **kwargs ) result = yield preserve_context_over_fn( diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py new file mode 100644 index 0000000000..d1829f84e8 --- /dev/null +++ b/synapse/storage/account_data.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import ujson as json +import logging + +logger = logging.getLogger(__name__) + + +class AccountDataStore(SQLBaseStore): + + def get_account_data_for_user(self, user_id): + """Get all the client account_data for a user. + + Args: + user_id(str): The user to get the account_data for. + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_account_data_for_user_txn(txn): + rows = self._simple_select_list_txn( + txn, "account_data", {"user_id": user_id}, + ["account_data_type", "content"] + ) + + global_account_data = { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id}, + ["room_id", "account_data_type", "content"] + ) + + by_room = {} + for row in rows: + room_data = by_room.setdefault(row["room_id"], {}) + room_data[row["account_data_type"]] = json.loads(row["content"]) + + return (global_account_data, by_room) + + return self.runInteraction( + "get_account_data_for_user", get_account_data_for_user_txn + ) + + def get_account_data_for_room(self, user_id, room_id): + """Get all the client account_data for a user for a room. + + Args: + user_id(str): The user to get the account_data for. + room_id(str): The room to get the account_data for. + Returns: + A deferred dict of the room account_data + """ + def get_account_data_for_room_txn(txn): + rows = self._simple_select_list_txn( + txn, "room_account_data", {"user_id": user_id, "room_id": room_id}, + ["account_data_type", "content"] + ) + + return { + row["account_data_type"]: json.loads(row["content"]) for row in rows + } + + return self.runInteraction( + "get_account_data_for_room", get_account_data_for_room_txn + ) + + def get_updated_account_data_for_user(self, user_id, stream_id): + """Get all the client account_data for a that's changed. + + Args: + user_id(str): The user to get the account_data for. + stream_id(int): The point in the stream since which to get updates + Returns: + A deferred pair of a dict of global account_data and a dict + mapping from room_id string to per room account_data dicts. + """ + + def get_updated_account_data_for_user_txn(txn): + sql = ( + "SELECT account_data_type, content FROM account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + global_account_data = { + row[0]: json.loads(row[1]) for row in txn.fetchall() + } + + sql = ( + "SELECT room_id, account_data_type, content FROM room_account_data" + " WHERE user_id = ? AND stream_id > ?" + ) + + txn.execute(sql, (user_id, stream_id)) + + account_data_by_room = {} + for row in txn.fetchall(): + room_account_data = account_data_by_room.setdefault(row[0], {}) + room_account_data[row[1]] = json.loads(row[2]) + + return (global_account_data, account_data_by_room) + + return self.runInteraction( + "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + ) + + @defer.inlineCallbacks + def add_account_data_to_room(self, user_id, room_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + room_id(str): The room to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="room_account_data", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_room_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + @defer.inlineCallbacks + def add_account_data_for_user(self, user_id, account_data_type, content): + """Add some account_data to a room for a user. + Args: + user_id(str): The user to add a tag for. + account_data_type(str): The type of account_data to add. + content(dict): A json object to associate with the tag. + Returns: + A deferred that completes once the account_data has been added. + """ + content_json = json.dumps(content) + + def add_account_data_txn(txn, next_id): + self._simple_upsert_txn( + txn, + table="account_data", + keyvalues={ + "user_id": user_id, + "account_data_type": account_data_type, + }, + values={ + "stream_id": next_id, + "content": content_json, + } + ) + self._update_max_stream_id(txn, next_id) + + with (yield self._account_data_id_gen.get_next(self)) as next_id: + yield self.runInteraction( + "add_user_account_data", add_account_data_txn, next_id + ) + + result = yield self._account_data_id_gen.get_max_token(self) + defer.returnValue(result) + + def _update_max_stream_id(self, txn, next_id): + """Update the max stream_id + + Args: + txn: The database cursor + next_id(int): The the revision to advance to. + """ + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7088f2709b..fc5725097c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -649,7 +649,7 @@ class EventsStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], self._fetch_event_rows, event_ids + conn, "do_fetch", [], None, self._fetch_event_rows, event_ids ) row_dict = { diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 9800fd4203..16eff62544 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 26 +SCHEMA_VERSION = 27 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql index f5b2a08ca4..9523d2bcc3 100644 --- a/synapse/storage/schema/delta/15/v15.sql +++ b/synapse/storage/schema/delta/15/v15.sql @@ -1,23 +1,22 @@ -- Drop, copy & recreate pushers table to change unique key -- Also add access_token column at the same time CREATE TABLE IF NOT EXISTS pushers2 ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, - access_token INTEGER DEFAULT NULL, - profile_tag varchar(32) NOT NULL, - kind varchar(8) NOT NULL, - app_id varchar(64) NOT NULL, - app_display_name varchar(64) NOT NULL, - device_display_name varchar(128) NOT NULL, - pushkey blob NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey bytea NOT NULL, ts BIGINT NOT NULL, - lang varchar(8), - data blob, + lang VARCHAR(8), + data bytea, last_token TEXT, last_success BIGINT, failing_since BIGINT, - FOREIGN KEY(user_name) REFERENCES users(name), - UNIQUE (app_id, pushkey, user_name) + UNIQUE (app_id, pushkey) ); INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since) SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers; diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index 5239d69073..ba48e43792 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -38,7 +38,7 @@ CREATE INDEX event_search_ev_ridx ON event_search(room_id); SQLITE_TABLE = ( - "CREATE VIRTUAL TABLE IF NOT EXISTS event_search" + "CREATE VIRTUAL TABLE event_search" " USING fts4 ( event_id, room_id, sender, key, value )" ) diff --git a/synapse/storage/schema/delta/27/account_data.sql b/synapse/storage/schema/delta/27/account_data.sql new file mode 100644 index 0000000000..9f25416005 --- /dev/null +++ b/synapse/storage/schema/delta/27/account_data.sql @@ -0,0 +1,36 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS account_data( + user_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) +); + + +CREATE TABLE IF NOT EXISTS room_account_data( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + account_data_type TEXT NOT NULL, -- The type of the account_data. + stream_id BIGINT NOT NULL, -- The version of the account_data. + content TEXT NOT NULL, -- The JSON content of the account_data + CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) +); + + +CREATE INDEX account_data_stream_id on account_data(user_id, stream_id); +CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id); diff --git a/synapse/storage/schema/delta/26/forgotten_memberships.sql b/synapse/storage/schema/delta/27/forgotten_memberships.sql index beeb8a288b..beeb8a288b 100644 --- a/synapse/storage/schema/delta/26/forgotten_memberships.sql +++ b/synapse/storage/schema/delta/27/forgotten_memberships.sql diff --git a/synapse/storage/schema/delta/26/ts.py b/synapse/storage/schema/delta/27/ts.py index 8d4a981975..8d4a981975 100644 --- a/synapse/storage/schema/delta/26/ts.py +++ b/synapse/storage/schema/delta/27/ts.py diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 20a62d07ff..39f600f53c 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -140,7 +140,10 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [] + + search_query = search_query = _parse_query(self.database_engine, search_term) + + args = [search_query] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -162,7 +165,7 @@ class SearchStore(BackgroundUpdateStore): if isinstance(self.database_engine, PostgresEngine): sql = ( "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id" - " FROM plainto_tsquery('english', ?) as query, event_search" + " FROM to_tsquery('english', ?) as query, event_search" " WHERE vector @@ query" ) elif isinstance(self.database_engine, Sqlite3Engine): @@ -183,7 +186,7 @@ class SearchStore(BackgroundUpdateStore): sql += " ORDER BY rank DESC LIMIT 500" results = yield self._execute( - "search_msgs", self.cursor_to_dict, sql, *([search_term] + args) + "search_msgs", self.cursor_to_dict, sql, *args ) results = filter(lambda row: row["room_id"] in room_ids, results) @@ -197,7 +200,7 @@ class SearchStore(BackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_term, events) + highlights = yield self._find_highlights_in_postgres(search_query, events) defer.returnValue({ "results": [ @@ -226,7 +229,10 @@ class SearchStore(BackgroundUpdateStore): list of dicts """ clauses = [] - args = [search_term] + + search_query = search_query = _parse_query(self.database_engine, search_term) + + args = [search_query] # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. @@ -263,7 +269,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT ts_rank_cd(vector, query) as rank," " origin_server_ts, stream_ordering, room_id, event_id" - " FROM plainto_tsquery('english', ?) as query, event_search" + " FROM to_tsquery('english', ?) as query, event_search" " NATURAL JOIN events" " WHERE vector @@ query AND " ) @@ -313,7 +319,7 @@ class SearchStore(BackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_term, events) + highlights = yield self._find_highlights_in_postgres(search_query, events) defer.returnValue({ "results": [ @@ -330,7 +336,7 @@ class SearchStore(BackgroundUpdateStore): "highlights": highlights, }) - def _find_highlights_in_postgres(self, search_term, events): + def _find_highlights_in_postgres(self, search_query, events): """Given a list of events and a search term, return a list of words that match from the content of the event. @@ -338,7 +344,7 @@ class SearchStore(BackgroundUpdateStore): highlight the matching parts. Args: - search_term (str) + search_query (str) events (list): A list of events Returns: @@ -370,14 +376,14 @@ class SearchStore(BackgroundUpdateStore): while stop_sel in value: stop_sel += ">" - query = "SELECT ts_headline(?, plainto_tsquery('english', ?), %s)" % ( + query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % ( _to_postgres_options({ "StartSel": start_sel, "StopSel": stop_sel, "MaxFragments": "50", }) ) - txn.execute(query, (value, search_term,)) + txn.execute(query, (value, search_query,)) headline, = txn.fetchall()[0] # Now we need to pick the possible highlights out of the haedline @@ -399,3 +405,22 @@ def _to_postgres_options(options_dict): return "'%s'" % ( ",".join("%s=%s" % (k, v) for k, v in options_dict.items()), ) + + +def _parse_query(database_engine, search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + if isinstance(database_engine, PostgresEngine): + return " & ".join(result + ":*" for result in results) + elif isinstance(database_engine, Sqlite3Engine): + return " & ".join(result + "*" for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index f6d826cc59..f520f60c6c 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -48,8 +48,8 @@ class TagsStore(SQLBaseStore): Args: user_id(str): The user to get the tags for. Returns: - A deferred dict mapping from room_id strings to lists of tag - strings. + A deferred dict mapping from room_id strings to dicts mapping from + tag strings to tag content. """ deferred = self._simple_select_list( diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index d69c7cb991..2170746025 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -64,8 +64,7 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context + with PreserveLoggingContext(current_context): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/debug.py b/synapse/util/debug.py index f6a5a841a4..b2bee7958f 100644 --- a/synapse/util/debug.py +++ b/synapse/util/debug.py @@ -30,8 +30,7 @@ def debug_deferreds(): context = LoggingContext.current_context() def restore_context_callback(x): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = context + with PreserveLoggingContext(context): return fn(x) return restore_context_callback diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7e6062c1b8..d528ced55a 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -19,6 +19,25 @@ import logging logger = logging.getLogger(__name__) +try: + import resource + + # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined + # to be 1 on linux so we hard code it. + RUSAGE_THREAD = 1 + + # If the system doesn't support RUSAGE_THREAD then this should throw an + # exception. + resource.getrusage(RUSAGE_THREAD) + + def get_thread_resource_usage(): + return resource.getrusage(RUSAGE_THREAD) +except: + # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we + # won't track resource usage by returning None. + def get_thread_resource_usage(): + return None + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a @@ -27,7 +46,9 @@ class LoggingContext(object): name (str): Name for the context for debugging. """ - __slots__ = ["parent_context", "name", "__dict__"] + __slots__ = [ + "parent_context", "name", "usage_start", "usage_end", "main_thread", "__dict__" + ] thread_local = threading.local() @@ -42,11 +63,26 @@ class LoggingContext(object): def copy_to(self, record): pass + def start(self): + pass + + def stop(self): + pass + + def add_database_transaction(self, duration_ms): + pass + sentinel = Sentinel() def __init__(self, name=None): self.parent_context = None self.name = name + self.ru_stime = 0. + self.ru_utime = 0. + self.db_txn_count = 0 + self.db_txn_duration = 0. + self.usage_start = None + self.main_thread = threading.current_thread() def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -56,12 +92,26 @@ class LoggingContext(object): """Get the current logging context from thread local storage""" return getattr(cls.thread_local, "current_context", cls.sentinel) + @classmethod + def set_current_context(cls, context): + """Set the current logging context in thread local storage + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + current = cls.current_context() + if current is not context: + current.stop() + cls.thread_local.current_context = context + context.start() + return current + def __enter__(self): """Enters this logging context into thread local storage""" if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") - self.parent_context = self.current_context() - self.thread_local.current_context = self + self.parent_context = self.set_current_context(self) return self def __exit__(self, type, value, traceback): @@ -70,16 +120,16 @@ class LoggingContext(object): Returns: None to avoid suppressing any exeptions that were thrown. """ - if self.thread_local.current_context is not self: - if self.thread_local.current_context is self.sentinel: + current = self.set_current_context(self.parent_context) + if current is not self: + if current is self.sentinel: logger.debug("Expected logging context %s has been lost", self) else: logger.warn( "Current logging context %s is not expected context %s", - self.thread_local.current_context, + current, self ) - self.thread_local.current_context = self.parent_context self.parent_context = None def __getattr__(self, name): @@ -93,6 +143,43 @@ class LoggingContext(object): for key, value in self.__dict__.items(): setattr(record, key, value) + record.ru_utime, record.ru_stime = self.get_resource_usage() + + def start(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start and self.usage_end: + self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime + self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime + self.usage_start = None + self.usage_end = None + + if not self.usage_start: + self.usage_start = get_thread_resource_usage() + + def stop(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start: + self.usage_end = get_thread_resource_usage() + + def get_resource_usage(self): + ru_utime = self.ru_utime + ru_stime = self.ru_stime + + if self.usage_start and threading.current_thread() is self.main_thread: + current = get_thread_resource_usage() + ru_utime += current.ru_utime - self.usage_start.ru_utime + ru_stime += current.ru_stime - self.usage_start.ru_stime + + return ru_utime, ru_stime + + def add_database_transaction(self, duration_ms): + self.db_txn_count += 1 + self.db_txn_duration += duration_ms / 1000. + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each @@ -121,17 +208,20 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context"] + __slots__ = ["current_context", "new_context"] + + def __init__(self, new_context=LoggingContext.sentinel): + self.new_context = new_context def __enter__(self): """Captures the current logging context""" - self.current_context = LoggingContext.current_context() - LoggingContext.thread_local.current_context = LoggingContext.sentinel + self.current_context = LoggingContext.set_current_context( + self.new_context + ) def __exit__(self, type, value, traceback): """Restores the current logging context""" - LoggingContext.thread_local.current_context = self.current_context - + LoggingContext.set_current_context(self.current_context) if self.current_context is not LoggingContext.sentinel: if self.current_context.parent_context is None: logger.warn( @@ -164,8 +254,7 @@ class _PreservingContextDeferred(defer.Deferred): def _wrap_callback(self, f): def g(res, *args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = self._log_context + with PreserveLoggingContext(self._log_context): res = f(res, *args, **kwargs) return res return g |