From 3c5f25507b7a62165a782420add4f9dedcec0b3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 13:55:16 +0000 Subject: Yield on EDU handling --- synapse/federation/federation_server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e8bfbe7cb5..a961b17aea 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -137,8 +137,8 @@ class FederationServer(FederationBase): logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( + for edu in (Edu(**x) for x in transaction.edus): + yield self.received_edu( transaction.origin, edu.edu_type, edu.content @@ -161,11 +161,12 @@ class FederationServer(FederationBase): ) defer.returnValue((200, response)) + @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): received_edus_counter.inc() if edu_type in self.edu_handlers: - self.edu_handlers[edu_type](origin, content) + yield self.edu_handlers[edu_type](origin, content) else: logger.warn("Received EDU of type %s with no handler", edu_type) -- cgit 1.4.1 From 9adf0e92bc3dbe4305beaf602406fc5ca51ea37e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 15:12:50 +0000 Subject: Catch exceptions from EDU handling --- synapse/federation/federation_server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a961b17aea..76820b924b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -166,7 +166,12 @@ class FederationServer(FederationBase): received_edus_counter.inc() if edu_type in self.edu_handlers: - yield self.edu_handlers[edu_type](origin, content) + try: + yield self.edu_handlers[edu_type](origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception as e: + logger.exception("Failed to handle edu %r", edu_type, e) else: logger.warn("Received EDU of type %s with no handler", edu_type) -- cgit 1.4.1 From 5244c0b48ebb86273bbb79a1935cf5893aa6f310 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Mar 2016 18:03:08 +0000 Subject: Remove unused backfilled parameter from persist_event --- synapse/federation/federation_server.py | 1 - synapse/handlers/federation.py | 38 +++++++++++++-------------------- synapse/storage/events.py | 22 ++++--------------- 3 files changed, 19 insertions(+), 42 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 76820b924b..429ab6ddec 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -531,7 +531,6 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, - backfilled=False, state=state, auth_chain=auth_chain, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c172877bde..267fedf114 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -102,7 +102,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None, + def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. @@ -185,7 +185,6 @@ class FederationHandler(BaseHandler): origin, event, state=state, - backfilled=backfilled, ) except AuthError as e: raise FederationError( @@ -214,18 +213,17 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - if not backfilled: - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + with PreserveLoggingContext(): + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) if event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -645,7 +643,7 @@ class FederationHandler(BaseHandler): continue try: - self.on_receive_pdu(origin, p, backfilled=False) + self.on_receive_pdu(origin, p) except: logger.exception("Couldn't handle pdu") @@ -777,7 +775,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -817,7 +814,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -1072,8 +1068,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, backfilled=False, - current_state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() @@ -1083,7 +1078,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not backfilled and not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( event, context, self @@ -1092,9 +1087,7 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=backfilled, - is_new_state=(not outlier and not backfilled), - current_state=current_state, + is_new_state=not outlier, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1192,7 +1185,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, new_event_context, - backfilled=False, is_new_state=True, current_state=state, ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 285c586cfe..e444b64cee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, backfilled=False, + def persist_event(self, event, context, is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - self.min_stream_token -= 1 - stream_ordering = self.min_stream_token - - if stream_ordering is None: - stream_ordering_manager = self._stream_id_gen.get_next() - else: - @contextmanager - def stream_ordering_manager(): - yield stream_ordering - stream_ordering_manager = stream_ordering_manager() - try: - with stream_ordering_manager as stream_ordering: + with self._stream_id_gen.get_next() as stream_ordering: event.internal_metadata.stream_ordering = stream_ordering yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, context=context, - backfilled=backfilled, is_new_state=is_new_state, current_state=current_state, ) @@ -166,7 +152,7 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) @log_function - def _persist_event_txn(self, txn, event, context, backfilled, + def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table @@ -198,7 +184,7 @@ class EventsStore(SQLBaseStore): return self._persist_events_txn( txn, [(event, context)], - backfilled=backfilled, + backfilled=False, is_new_state=is_new_state, ) -- cgit 1.4.1 From acdfef7b1443a8260c43e31e9944b74dfdf286dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:13:05 +0000 Subject: Intern all the things --- synapse/events/__init__.py | 11 ++----- synapse/federation/federation_client.py | 1 + synapse/federation/transport/server.py | 28 ++++++++-------- synapse/http/server.py | 10 +++--- synapse/storage/_base.py | 3 +- synapse/storage/receipts.py | 21 ++++++------ synapse/storage/state.py | 10 +++--- synapse/util/caches/__init__.py | 58 ++++++++++++++++++++++++++++++++- 8 files changed, 97 insertions(+), 45 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 63004eaf04..23f8b612ae 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze -from synapse.util.caches import intern_string +from synapse.util.caches import intern_dict # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -143,14 +143,7 @@ class FrozenEvent(EventBase): # We intern these strings because they turn up a lot (especially when # caching). - event_dict["type"] = intern_string(event_dict["type"]) - if "state_key" in event_dict: - event_dict["state_key"] = intern_string(event_dict["state_key"]) - if "sender" in event_dict: - event_dict["sender"] = intern_string(event_dict["sender"]) - - event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) - event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) + event_dict = intern_dict(event_dict) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83c1f46586..37ee469fa2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -418,6 +418,7 @@ class FederationClient(FederationBase): "Failed to make_%s via %s: %s", membership, destination, e.message ) + raise raise RuntimeError("Failed to send to any server.") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 208bff8d4f..d65a7893d8 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -175,7 +175,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/([^/]*)/" + PATH = "/send/(?P[^/]*)/" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( @@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/([^/]*)/" + PATH = "/event/(?P[^/]*)/" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/([^/]*)/" + PATH = "/state/(?P[^/]*)/" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): @@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/([^/]*)/" + PATH = "/backfill/(?P[^/]*)/" def on_GET(self, origin, content, query, context): versions = query["v"] @@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet): - PATH = "/query/([^/]*)" + PATH = "/query/(?P[^/]*)" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): @@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): - PATH = "/make_join/([^/]*)/([^/]*)" + PATH = "/make_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet): class FederationMakeLeaveServlet(BaseFederationServlet): - PATH = "/make_leave/([^/]*)/([^/]*)" + PATH = "/make_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet): - PATH = "/send_leave/([^/]*)/([^/]*)" + PATH = "/send_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id, txid): @@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth/([^/]*)/([^/]*)" + PATH = "/event_auth(?P[^/]*)/(?P[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): - PATH = "/send_join/([^/]*)/([^/]*)" + PATH = "/send_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet): - PATH = "/invite/([^/]*)/([^/]*)" + PATH = "/invite/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet): class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): - PATH = "/exchange_third_party_invite/([^/]*)" + PATH = "/exchange_third_party_invite/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id): @@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/([^/]*)/([^/]*)" + PATH = "/query_auth/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): @@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? - PATH = "/get_missing_events/([^/]*)/?" + PATH = "/get_missing_events/(?P[^/]*)/?" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): diff --git a/synapse/http/server.py b/synapse/http/server.py index b17b190ee5..b82196fd5e 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes ) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import intern_dict import synapse.metrics import synapse.events @@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource): else: servlet_classname = "%r" % callback - args = [ - urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() - ] + kwargs = intern_dict({ + name: urllib.unquote(value).decode("UTF-8") if value else value + for name, value in m.groupdict().items() + }) - callback_return = yield callback(request, *args) + callback_return = yield callback(request, **kwargs) if callback_return is not None: code, response = callback_return self._send_response(request, code, response) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 583b77a835..b75b79df36 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache +from synapse.util.caches import intern_dict import synapse.metrics @@ -350,7 +351,7 @@ class SQLBaseStore(object): """ col_headers = list(column[0] for column in cursor.description) results = list( - dict(zip(col_headers, row)) for row in cursor.fetchall() + intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall() ) return results diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index dbc074d6b5..6b9d848eaa 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): - def f(txn): - sql = ( - "SELECT room_id,event_id " - "FROM receipts_linearized " - "WHERE user_id = ? AND receipt_type = ? " - ) - txn.execute(sql, (user_id, receipt_type)) - return txn.fetchall() + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={ + "user_id": user_id, + "receipt_type": receipt_type, + }, + retcols=("room_id", "event_id"), + desc="get_receipts_for_user", + ) - defer.returnValue(dict( - (yield self.runInteraction("get_receipts_for_user", f)) - )) + defer.returnValue({row["room_id"]: row["event_id"] for row in rows}) @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1982b1c607..03eecbbbb6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -156,9 +156,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key( - room_id, intern_string(event_type), intern_string(state_key) - ) + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -205,7 +203,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (intern_string(row["type"]), intern_string(row["state_key"])) + key = (row["type"], row["state_key"]) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -286,7 +284,9 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) + defer.returnValue({ + intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows + }) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 9d450fade5..838cec45fe 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -15,6 +15,9 @@ import synapse.metrics from lrucache import LruCache +import os + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) DEBUG_CACHES = False @@ -27,9 +30,62 @@ cache_counter = metrics.register_cache( labels=["name"], ) -_string_cache = LruCache(5000) +_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache +KNOWN_KEYS = { + key: key for key in + ( + "auth_events", + "content", + "depth", + "event_id", + "hashes", + "origin", + "origin_server_ts", + "prev_events", + "room_id", + "sender", + "signatures", + "state_key", + "type", + "unsigned", + "user_id", + ) +} + + def intern_string(string): + """Takes a (potentially) unicode string and interns using custom cache + """ return _string_cache.setdefault(string, string) + + +def intern_dict(dictionary): + """Takes a dictionary and interns well known keys and their values + """ + return _intern_known_values({ + _intern_key(key): value for key, value in dictionary.items() + }) + + +def _intern_known_values(dictionary): + intern_str_keys = ("event_id", "room_id") + intern_unicode_keys = ("sender", "user_id", "type", "state_key") + + for key in intern_str_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern(val.encode('ascii')) + + for key in intern_unicode_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern_string(val) + + return dictionary + + +def _intern_key(key): + return KNOWN_KEYS.get(key, key) -- cgit 1.4.1