diff options
author | Erik Johnston <erik@matrix.org> | 2015-02-10 11:04:37 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-02-10 11:04:37 +0000 |
commit | c86ebe76736a4cde48cfb3229d894a174c6bd379 (patch) | |
tree | afc236e8ef1bdbd6d0686747b99743661522638e /synapse | |
parent | Oops, we do want to defer.return regardless of whether we are caching or not (diff) | |
parent | Merge branch 'release-v0.6.2' of github.com:matrix-org/synapse into develop (diff) | |
download | synapse-c86ebe76736a4cde48cfb3229d894a174c6bd379.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into state-chache
Conflicts: synapse/app/homeserver.py synapse/state.py
Diffstat (limited to '')
-rw-r--r-- | synapse/api/auth.py | 21 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 3 | ||||
-rw-r--r-- | synapse/config/_base.py | 5 | ||||
-rw-r--r-- | synapse/handlers/message.py | 32 | ||||
-rw-r--r-- | synapse/handlers/register.py | 17 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 9 | ||||
-rw-r--r-- | synapse/http/server.py | 23 | ||||
-rw-r--r-- | synapse/push/__init__.py | 35 | ||||
-rw-r--r-- | synapse/push/baserules.py | 12 | ||||
-rw-r--r-- | synapse/rest/client/v1/__init__.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/__init__.py | 2 | ||||
-rw-r--r-- | synapse/rest/media/v1/base_resource.py | 2 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 1 | ||||
-rw-r--r-- | synapse/rest/media/v1/upload_resource.py | 57 | ||||
-rw-r--r-- | synapse/state.py | 2 | ||||
-rw-r--r-- | synapse/storage/_base.py | 56 |
16 files changed, 206 insertions, 73 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 7105ee21dc..0054745363 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -89,12 +89,19 @@ class Auth(object): raise @defer.inlineCallbacks - def check_joined_room(self, room_id, user_id): - member = yield self.state.get_current_state( - room_id=room_id, - event_type=EventTypes.Member, - state_key=user_id - ) + def check_joined_room(self, room_id, user_id, current_state=None): + if current_state: + member = current_state.get( + (EventTypes.Member, user_id), + None + ) + else: + member = yield self.state.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + self._check_joined_room(member, user_id, room_id) defer.returnValue(member) @@ -102,7 +109,7 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) - for event in curr_state: + for event in curr_state.values(): if event.type == EventTypes.Member: try: if UserID.from_string(event.state_key).domain != host: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3a128af5f7..2b17cae54f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -67,7 +67,7 @@ class SynapseHomeServer(HomeServer): return ClientV2AlphaRestResource(self) def build_resource_for_federation(self): - return JsonResource() + return JsonResource(self) def build_resource_for_web_client(self): syweb_path = os.path.dirname(syweb.__file__) @@ -275,6 +275,7 @@ def setup(): hs.get_pusherpool().start() hs.get_state_handler().start_caching() + hs.get_datastore().start_profiling() if config.daemonize: print config.pid_file diff --git a/synapse/config/_base.py b/synapse/config/_base.py index dfc115d8e8..9b0f8c3c32 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -50,8 +50,9 @@ class Config(object): ) return cls.abspath(file_path) - @staticmethod - def ensure_directory(dir_path): + @classmethod + def ensure_directory(cls, dir_path): + dir_path = cls.abspath(dir_path) if not os.path.exists(dir_path): os.makedirs(dir_path) if not os.path.isdir(dir_path): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3f51f38f18..3355adefcf 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -35,6 +35,7 @@ class MessageHandler(BaseHandler): def __init__(self, hs): super(MessageHandler, self).__init__(hs) self.hs = hs + self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() @@ -225,7 +226,9 @@ class MessageHandler(BaseHandler): # TODO: This is duplicating logic from snapshot_all_rooms current_state = yield self.state_handler.get_current_state(room_id) now = self.clock.time_msec() - defer.returnValue([serialize_event(c, now) for c in current_state]) + defer.returnValue( + [serialize_event(c, now) for c in current_state.values()] + ) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -313,7 +316,7 @@ class MessageHandler(BaseHandler): ) d["state"] = [ serialize_event(c, time_now, as_client_event) - for c in current_state + for c in current_state.values() ] except: logger.exception("Failed to get snapshot") @@ -329,7 +332,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def room_initial_sync(self, user_id, room_id, pagin_config=None, feedback=False): - yield self.auth.check_joined_room(room_id, user_id) + current_state = yield self.state.get_current_state( + room_id=room_id, + ) + + yield self.auth.check_joined_room( + room_id, user_id, + current_state=current_state + ) # TODO(paul): I wish I was called with user objects not user_id # strings... @@ -337,13 +347,12 @@ class MessageHandler(BaseHandler): # TODO: These concurrently time_now = self.clock.time_msec() - state_tuples = yield self.state_handler.get_current_state(room_id) - state = [serialize_event(x, time_now) for x in state_tuples] + state = [ + serialize_event(x, time_now) + for x in current_state.values() + ] - member_event = (yield self.store.get_room_member( - user_id=user_id, - room_id=room_id - )) + member_event = current_state.get((EventTypes.Member, user_id,)) now_token = yield self.hs.get_event_sources().get_current_token() @@ -360,7 +369,10 @@ class MessageHandler(BaseHandler): start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) - room_members = yield self.store.get_room_members(room_id) + room_members = [ + m for m in current_state.values() + if m.type == EventTypes.Member + ] presence_handler = self.hs.get_handlers().presence_handler presence = [] diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 66a89c10b2..4f06c487b1 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -99,6 +99,23 @@ class RegistrationHandler(BaseHandler): raise RegistrationError( 500, "Cannot generate user ID.") + # create a default avatar for the user + # XXX: ideally clients would explicitly specify one, but given they don't + # and we want consistent and pretty identicons for random users, we'll + # do it here. + try: + auth_user = UserID.from_string(user_id) + identicon_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("identicon", None) + upload_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("upload", None) + identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320) + content_uri = yield upload_resource.create_content( + "image/png", None, identicon_bytes, len(identicon_bytes), auth_user + ) + profile_handler = self.hs.get_handlers().profile_handler + profile_handler.set_avatar_url(auth_user, auth_user, ("%s#auto" % content_uri)) + except NotImplementedError: + pass # make tests pass without messing around creating default avatars + defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 439164ae39..5af90cc5d1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -175,9 +175,10 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, ) - current_state_events = yield self.state_handler.get_current_state( + current_state = yield self.state_handler.get_current_state( room_id ) + current_state_events = current_state.values() defer.returnValue(RoomSyncResult( room_id=room_id, @@ -347,9 +348,10 @@ class SyncHandler(BaseHandler): # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is - current_state_events = yield self.state_handler.get_current_state( + current_state = yield self.state_handler.get_current_state( room_id ) + current_state_events = current_state.values() state_at_previous_sync = yield self.get_state_at_previous_sync( room_id, since_token=since_token @@ -431,6 +433,7 @@ class SyncHandler(BaseHandler): joined = True if joined: - state_delta = yield self.state_handler.get_current_state(room_id) + res = yield self.state_handler.get_current_state(room_id) + state_delta = res.values() defer.returnValue(state_delta) diff --git a/synapse/http/server.py b/synapse/http/server.py index 0f6539e1be..6d084fa33c 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -69,9 +69,10 @@ class JsonResource(HttpServer, resource.Resource): _PathEntry = collections.namedtuple("_PathEntry", ["pattern", "callback"]) - def __init__(self): + def __init__(self, hs): resource.Resource.__init__(self) + self.clock = hs.get_clock() self.path_regexs = {} def register_path(self, method, path_pattern, callback): @@ -111,6 +112,7 @@ class JsonResource(HttpServer, resource.Resource): This checks if anyone has registered a callback for that method and path. """ + code = None try: # Just say yes to OPTIONS. if request.method == "OPTIONS": @@ -130,6 +132,13 @@ class JsonResource(HttpServer, resource.Resource): urllib.unquote(u).decode("UTF-8") for u in m.groups() ] + logger.info( + "Received request: %s %s", + request.method, request.path + ) + + start = self.clock.time_msec() + code, response = yield path_entry.callback( request, *args @@ -145,9 +154,11 @@ class JsonResource(HttpServer, resource.Resource): logger.info("%s SynapseError: %s - %s", request, e.code, e.msg) else: logger.exception(e) + + code = e.code self._send_response( request, - e.code, + code, cs_exception(e), response_code_message=e.response_code_message ) @@ -158,6 +169,14 @@ class JsonResource(HttpServer, resource.Resource): 500, {"error": "Internal server error"} ) + finally: + code = str(code) if code else "-" + + end = self.clock.time_msec() + logger.info( + "Processed request: %dms %s %s %s", + end-start, code, request.method, request.path + ) def _send_response(self, request, code, response_json_object, response_code_message=None): diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 7293715293..6f143a5df9 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -22,7 +22,6 @@ import synapse.util.async import baserules import logging -import fnmatch import json import re @@ -130,26 +129,38 @@ class Pusher(object): defer.returnValue(Pusher.DEFAULT_ACTIONS) + @staticmethod + def _glob_to_regexp(glob): + r = re.escape(glob) + r = re.sub(r'\\\*', r'.*?', r) + r = re.sub(r'\\\?', r'.', r) + + # handle [abc], [a-z] and [!a-z] style ranges. + r = re.sub(r'\\\[(\\\!|)(.*)\\\]', + lambda x: ('[%s%s]' % (x.group(1) and '^' or '', + re.sub(r'\\\-', '-', x.group(2)))), r) + return r + def _event_fulfills_condition(self, ev, condition, display_name, room_member_count): if condition['kind'] == 'event_match': if 'pattern' not in condition: logger.warn("event_match condition with no pattern") return False - pat = condition['pattern'] - - if pat.strip("*?[]") == pat: - # no special glob characters so we assume the user means - # 'contains this string' rather than 'is this string' - pat = "*%s*" % (pat,) - + # XXX: optimisation: cache our pattern regexps + if condition['key'] == 'content.body': + r = r'\b%s\b' % self._glob_to_regexp(condition['pattern']) + else: + r = r'^%s$' % self._glob_to_regexp(condition['pattern']) val = _value_for_dotted_key(condition['key'], ev) if val is None: return False - return fnmatch.fnmatch(val.upper(), pat.upper()) + return re.search(r, val, flags=re.IGNORECASE) is not None + elif condition['kind'] == 'device': if 'profile_tag' not in condition: return True return condition['profile_tag'] == self.profile_tag + elif condition['kind'] == 'contains_display_name': # This is special because display names can be different # between rooms and so you can't really hard code it in a rule. @@ -159,9 +170,9 @@ class Pusher(object): return False if not display_name: return False - return fnmatch.fnmatch( - ev['content']['body'].upper(), "*%s*" % (display_name.upper(),) - ) + return re.search("\b%s\b" % re.escape(display_name), + ev['content']['body'], flags=re.IGNORECASE) is not None + elif condition['kind'] == 'room_member_count': if 'is' not in condition: return False diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 8d4b806da6..37878f1e0b 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -4,24 +4,24 @@ def list_with_base_rules(rawrules, user_name): ruleslist = [] # shove the server default rules for each kind onto the end of each - current_prio_class = 1 + current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] for r in rawrules: - if r['priority_class'] > current_prio_class: - while current_prio_class < r['priority_class']: + if r['priority_class'] < current_prio_class: + while r['priority_class'] < current_prio_class: ruleslist.extend(make_base_rules( user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]) ) - current_prio_class += 1 + current_prio_class -= 1 ruleslist.append(r) - while current_prio_class <= PRIORITY_CLASS_INVERSE_MAP.keys()[-1]: + while current_prio_class > 0: ruleslist.extend(make_base_rules( user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]) ) - current_prio_class += 1 + current_prio_class -= 1 return ruleslist diff --git a/synapse/rest/client/v1/__init__.py b/synapse/rest/client/v1/__init__.py index d8d01cdd16..21876b3487 100644 --- a/synapse/rest/client/v1/__init__.py +++ b/synapse/rest/client/v1/__init__.py @@ -25,7 +25,7 @@ class ClientV1RestResource(JsonResource): """A resource for version 1 of the matrix client API.""" def __init__(self, hs): - JsonResource.__init__(self) + JsonResource.__init__(self, hs) self.register_servlets(self, hs) @staticmethod diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index 8f611de3a8..bca65f2a6a 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -25,7 +25,7 @@ class ClientV2AlphaRestResource(JsonResource): """A resource for version 2 alpha of the matrix client API.""" def __init__(self, hs): - JsonResource.__init__(self) + JsonResource.__init__(self, hs) self.register_servlets(self, hs) @staticmethod diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py index 688e7376ad..d44d5f1298 100644 --- a/synapse/rest/media/v1/base_resource.py +++ b/synapse/rest/media/v1/base_resource.py @@ -82,7 +82,7 @@ class BaseMediaResource(Resource): raise SynapseError( 404, "Invalid media id token %r" % (request.postpath,), - Codes.UNKKOWN, + Codes.UNKNOWN, ) @staticmethod diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 61ed90f39f..9ca4d884dd 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -34,6 +34,7 @@ class MediaRepositoryResource(Resource): => POST /_matrix/media/v1/upload HTTP/1.1 Content-Type: <media-type> + Content-Length: <content-length> <media> diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index b939a30e19..5b42782331 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -38,6 +38,35 @@ class UploadResource(BaseMediaResource): def render_OPTIONS(self, request): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET + + @defer.inlineCallbacks + def create_content(self, media_type, upload_name, content, content_length, auth_user): + media_id = random_string(24) + + fname = self.filepaths.local_media_filepath(media_id) + self._makedirs(fname) + + # This shouldn't block for very long because the content will have + # already been uploaded at this point. + with open(fname, "wb") as f: + f.write(content) + + yield self.store.store_local_media( + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=upload_name, + media_length=content_length, + user_id=auth_user, + ) + media_info = { + "media_type": media_type, + "media_length": content_length, + } + + yield self._generate_local_thumbnails(media_id, media_info) + + defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) @defer.inlineCallbacks def _async_render_POST(self, request): @@ -70,32 +99,10 @@ class UploadResource(BaseMediaResource): # disposition = headers.getRawHeaders("Content-Disposition")[0] # TODO(markjh): parse content-dispostion - media_id = random_string(24) - - fname = self.filepaths.local_media_filepath(media_id) - self._makedirs(fname) - - # This shouldn't block for very long because the content will have - # already been uploaded at this point. - with open(fname, "wb") as f: - f.write(request.content.read()) - - yield self.store.store_local_media( - media_id=media_id, - media_type=media_type, - time_now_ms=self.clock.time_msec(), - upload_name=None, - media_length=content_length, - user_id=auth_user, + content_uri = yield self.create_content( + media_type, None, request.content.read(), + content_length, auth_user ) - media_info = { - "media_type": media_type, - "media_length": content_length, - } - - yield self._generate_local_thumbnails(media_id, media_info) - - content_uri = "mxc://%s/%s" % (self.server_name, media_id) respond_with_json( request, 200, {"content_uri": content_uri}, send_cors=True diff --git a/synapse/state.py b/synapse/state.py index 31f503a1ee..64c58a3934 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -114,7 +114,7 @@ class StateHandler(object): defer.returnValue(state.get((event_type, state_key))) return - defer.returnValue(state.values()) + defer.returnValue(state) @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b350fd61f1..5ddd410607 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -85,6 +85,52 @@ class SQLBaseStore(object): self._db_pool = hs.get_db_pool() self._clock = hs.get_clock() + self._previous_txn_total_time = 0 + self._current_txn_total_time = 0 + self._previous_loop_ts = 0 + self._txn_perf_counters = {} + self._previous_txn_perf_counters = {} + + def start_profiling(self): + self._previous_loop_ts = self._clock.time_msec() + + def loop(): + curr = self._current_txn_total_time + prev = self._previous_txn_total_time + self._previous_txn_total_time = curr + + time_now = self._clock.time_msec() + time_then = self._previous_loop_ts + self._previous_loop_ts = time_now + + ratio = (curr - prev)/(time_now - time_then) + + txn_counters = [] + for name, (count, cum_time) in self._txn_perf_counters.items(): + prev_count, prev_time = self._previous_txn_perf_counters.get( + name, (0,0) + ) + txn_counters.append(( + (cum_time - prev_time) / (time_now - time_then), + count - prev_count, + name + )) + + self._previous_txn_perf_counters = dict(self._txn_perf_counters) + + txn_counters.sort(reverse=True) + top_three_counters = ", ".join( + "%s(%d): %.3f%%" % (name, count, 100 * ratio) + for ratio, count, name in txn_counters[:3] + ) + + logger.info( + "Total database time: %.3f%% {%s}", + ratio * 100, top_three_counters + ) + + self._clock.looping_call(loop, 10000) + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" @@ -94,7 +140,7 @@ class SQLBaseStore(object): with LoggingContext("runInteraction") as context: current_context.copy_to(context) start = time.time() * 1000 - txn_id = SQLBaseStore._TXN_ID + txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from # growing really large. @@ -114,6 +160,14 @@ class SQLBaseStore(object): "[TXN END] {%s} %f", name, end - start ) + + self._current_txn_total_time += end - start + + count, cum_time = self._txn_perf_counters.get(desc, (0,0)) + count += 1 + cum_time += end - start + self._txn_perf_counters[desc] = (count, cum_time) + with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( inner_func, *args, **kwargs |