From 4d6cb8814e134eba644afeed7bd49df0c7951342 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Aug 2015 09:32:23 +0100 Subject: Speed up event filtering (for ACL) logic --- synapse/handlers/federation.py | 6 +++++- synapse/handlers/message.py | 6 +++++- synapse/handlers/sync.py | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f7155fd8d3..22f534e49a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,7 +230,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) ) events_and_states = zip(events, states) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9d6d4f0978..765b14d994 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -138,7 +138,11 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) events_and_states = zip(events, states) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6cff6230c1..8f58774b31 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -295,7 +295,11 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) events_and_states = zip(events, states) -- cgit 1.5.1 From 07507643cb6a2fde1a87d229f8d77525627a0632 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2015 15:06:51 +0100 Subject: Use dictionary cache to do group -> state fetching --- synapse/handlers/federation.py | 2 +- synapse/state.py | 10 +- synapse/storage/_base.py | 39 +++++--- synapse/storage/state.py | 191 ++++++++++++++++++++++++++------------- synapse/storage/stream.py | 3 +- synapse/util/dictionary_cache.py | 58 +++++++----- tests/test_state.py | 2 +- 7 files changed, 195 insertions(+), 110 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 22f534e49a..90649af9e1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -507,7 +507,7 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e]) + self.state_handler.resolve_state_groups(room_id, [e]) for e in event_ids ]) states = dict(zip(event_ids, [s[1] for s in states])) diff --git a/synapse/state.py b/synapse/state.py index 80da90a72c..b5e5d7bbda 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -96,7 +96,7 @@ class StateHandler(object): cache.ts = self.clock.time_msec() state = cache.state else: - res = yield self.resolve_state_groups(event_ids) + res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] if event_type: @@ -155,13 +155,13 @@ class StateHandler(object): if event.is_state(): ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], event_type=event.type, state_key=event.state_key, ) else: ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], ) group, curr_state, prev_state = ret @@ -180,7 +180,7 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, event_ids, event_type=None, state_key=""): + def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. @@ -205,7 +205,7 @@ class StateHandler(object): ) state_groups = yield self.store.get_state_groups( - event_ids + room_id, event_ids ) logger.debug( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7b76ee3b73..803b9d599d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache +from synapse.util.dictionary_cache import DictionaryCache import synapse.metrics from util.id_generators import IdGenerator, StreamIdGenerator @@ -87,23 +88,33 @@ class Cache(object): ) def get(self, *keyargs): - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) + try: + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) - val = self.cache.get(keyargs, self.sentinel) - if val is not self.sentinel: - cache_counter.inc_hits(self.name) - return val + val = self.cache.get(keyargs, self.sentinel) + if val is not self.sentinel: + cache_counter.inc_hits(self.name) + return val - cache_counter.inc_misses(self.name) - raise KeyError() + cache_counter.inc_misses(self.name) + raise KeyError() + except KeyError: + raise + except: + logger.exception("Cache.get failed for %s" % (self.name,)) + raise def update(self, sequence, *args): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(*args) + try: + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + self.prefill(*args) + except: + logger.exception("Cache.update failed for %s" % (self.name,)) + raise def prefill(self, *args): # because I can't *keyargs, value keyargs = args[:-1] @@ -327,6 +338,8 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 91a5ae86a4..a967b3d44b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -45,52 +45,38 @@ class StateStore(SQLBaseStore): """ @defer.inlineCallbacks - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids The return value is a dict mapping group names to lists of events. """ - def f(txn): - groups = set() - for event_id in event_ids: - group = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - groups.add(group) - - res = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) - - res[group] = state_ids + event_and_groups = yield defer.gatherResults( + [ + self._get_state_group_for_event( + room_id, event_id, + ).addCallback(lambda group, event_id: (event_id, group), event_id) + for event_id in event_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) - return res + groups = set(group for _, group in event_and_groups if group) - states = yield self.runInteraction( - "get_state_groups", - f, - ) - - state_list = yield defer.gatherResults( + group_to_state = yield defer.gatherResults( [ - self._fetch_events_for_group(group, vals) - for group, vals in states.items() + self._get_state_for_group( + group, + ).addCallback(lambda state_dict, group: (group, state_dict), group) + for group in groups ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) - defer.returnValue(dict(state_list)) + defer.returnValue({ + group: state_map.values() + for group, state_map in group_to_state + }) @cached(num_args=1) def _fetch_events_for_group(self, key, events): @@ -207,16 +193,25 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cached(num_args=3, lru=True) - def _get_state_groups_from_group(self, room_id, group, types): + @cached(num_args=2, lru=True, max_entries=10000) + def _get_state_groups_from_group(self, group, types): def f(txn): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + sql = ( "SELECT event_id FROM state_groups_state WHERE" - " room_id = ? AND state_group = ? AND (%s)" - ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),) + " state_group = ? %s" + ) % (where_clause,) + + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) - args = [room_id, group] - args.extend([i for typ in types for i in typ]) txn.execute(sql, args) return group, [ @@ -229,7 +224,7 @@ class StateStore(SQLBaseStore): f, ) - @cached(num_args=3, lru=True, max_entries=100000) + @cached(num_args=3, lru=True, max_entries=20000) def _get_state_for_event_id(self, room_id, event_id, types): def f(txn): type_and_state_sql = " OR ".join([ @@ -280,40 +275,33 @@ class StateStore(SQLBaseStore): deferred: A list of dicts corresponding to the event_ids given. The dicts are mappings from (type, state_key) -> state_events """ - set_types = frozenset(types) - res = yield defer.gatherResults( + event_and_groups = yield defer.gatherResults( [ - self._get_state_for_event_id( - room_id, event_id, set_types, - ) + self._get_state_group_for_event( + room_id, event_id, + ).addCallback(lambda group, event_id: (event_id, group), event_id) for event_id in event_ids ], consumeErrors=True, ).addErrback(unwrapFirstError) - event_to_state_ids = dict(res) + groups = set(group for _, group in event_and_groups) - event_dict = yield self._get_events( + res = yield defer.gatherResults( [ - item - for lst in event_to_state_ids.values() - for item in lst + self._get_state_for_group( + group, types + ).addCallback(lambda state_dict, group: (group, state_dict), group) + for group in groups ], - get_prev_content=False - ).addCallback( - lambda evs: {ev.event_id: ev for ev in evs} - ) + consumeErrors=True, + ).addErrback(unwrapFirstError) + + group_to_state = dict(res) event_to_state = { - event_id: { - (ev.type, ev.state_key): ev - for ev in [ - event_dict[state_id] - for state_id in state_ids - if state_id in event_dict - ] - } - for event_id, state_ids in event_to_state_ids.items() + event_id: group_to_state[group] + for event_id, group in event_and_groups } defer.returnValue([ @@ -321,6 +309,79 @@ class StateStore(SQLBaseStore): for event in event_ids ]) + @cached(num_args=2, lru=True, max_entries=100000) + def _get_state_group_for_event(self, room_id, event_id): + return self._simple_select_one_onecol( + table="event_to_state_groups", + keyvalues={ + "event_id": event_id, + }, + retcol="state_group", + allow_none=True, + desc="_get_state_group_for_event", + ) + + @defer.inlineCallbacks + def _get_state_for_group(self, group, types=None): + is_all, state_dict = self._state_group_cache.get(group) + + type_to_key = {} + missing_types = set() + if types is not None: + for typ, state_key in types: + if state_key is None: + type_to_key[typ] = None + missing_types.add((typ, state_key)) + else: + if type_to_key.get(typ, object()) is not None: + type_to_key.setdefault(typ, set()).add(state_key) + + if (typ, state_key) not in state_dict: + missing_types.add((typ, state_key)) + + if is_all and types is None: + defer.returnValue(state_dict) + + if is_all or (types is not None and not missing_types): + def include(typ, state_key): + sentinel = object() + valid_state_keys = type_to_key.get(typ, sentinel) + if valid_state_keys is sentinel: + return False + if valid_state_keys is None: + return True + if state_key in valid_state_keys: + return True + return False + + defer.returnValue({ + k: v + for k, v in state_dict.items() + if include(k[0], k[1]) + }) + + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence + _, state_ids = yield self._get_state_groups_from_group( + group, + frozenset(types) if types else None + ) + state_events = yield self._get_events(state_ids, get_prev_content=False) + state_dict = { + (e.type, e.state_key): e + for e in state_events + } + + # Update the cache + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) + + defer.returnValue(state_dict) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index af45fc5619..9db259d5fc 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -300,8 +300,7 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False, from_token=None): + def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): # TODO (erikj): Handle compressed feedback end_token = RoomStreamToken.parse_stream_token(end_token) diff --git a/synapse/util/dictionary_cache.py b/synapse/util/dictionary_cache.py index 0877cc79f6..38b131677c 100644 --- a/synapse/util/dictionary_cache.py +++ b/synapse/util/dictionary_cache.py @@ -16,6 +16,10 @@ from synapse.util.lrucache import LruCache from collections import namedtuple import threading +import logging + + +logger = logging.getLogger(__name__) DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value")) @@ -47,21 +51,25 @@ class DictionaryCache(object): ) def get(self, key, dict_keys=None): - entry = self.cache.get(key, self.sentinel) - if entry is not self.sentinel: - # cache_counter.inc_hits(self.name) - - if dict_keys is None: - return DictionaryEntry(entry.full, dict(entry.value)) - else: - return DictionaryEntry(entry.full, { - k: entry.value[k] - for k in dict_keys - if k in entry.value - }) - - # cache_counter.inc_misses(self.name) - return DictionaryEntry(False, {}) + try: + entry = self.cache.get(key, self.sentinel) + if entry is not self.sentinel: + # cache_counter.inc_hits(self.name) + + if dict_keys is None: + return DictionaryEntry(entry.full, dict(entry.value)) + else: + return DictionaryEntry(entry.full, { + k: entry.value[k] + for k in dict_keys + if k in entry.value + }) + + # cache_counter.inc_misses(self.name) + return DictionaryEntry(False, {}) + except: + logger.exception("get failed") + raise def invalidate(self, key): self.check_thread() @@ -77,14 +85,18 @@ class DictionaryCache(object): self.cache.clear() def update(self, sequence, key, value, full=False): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - if full: - self._insert(key, value) - else: - self._update_or_insert(key, value) + try: + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + if full: + self._insert(key, value) + else: + self._update_or_insert(key, value) + except: + logger.exception("update failed") + raise def _update_or_insert(self, key, value): entry = self.cache.setdefault(key, DictionaryEntry(False, {})) diff --git a/tests/test_state.py b/tests/test_state.py index fea25f7021..5845358754 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -69,7 +69,7 @@ class StateGroupStore(object): self._next_group = 1 - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): groups = {} for event_id in event_ids: group = self._event_to_state_group.get(event_id) -- cgit 1.5.1 From ffdb8c382860ce2e351614a91c2ce07a91c61455 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 18:13:48 +0100 Subject: Don't be too enthusiatic with defer.gatherResults --- synapse/handlers/message.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 765b14d994..11c736f727 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -405,10 +405,14 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - yield defer.gatherResults( - [handle_room(e) for e in room_list], - consumeErrors=True - ).addErrback(unwrapFirstError) + # Only do N rooms at once + n = 5 + d_list = [handle_room(e) for e in room_list] + for ds in [d_list[i:i + n] for i in range(0, len(d_list), n)]: + yield defer.gatherResults( + ds, + consumeErrors=True + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, -- cgit 1.5.1 From 1b994a97dd201d0f122a416f28dbbf1136304412 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2015 10:41:40 +0100 Subject: Fix application of ACLs --- synapse/handlers/federation.py | 11 +++++------ synapse/handlers/message.py | 16 ++++++++++++---- synapse/handlers/sync.py | 17 +++++++++++++---- synapse/storage/state.py | 6 +++--- 4 files changed, 33 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 90649af9e1..2bfd0a40e0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -229,7 +229,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): - states = yield self.store.get_state_for_events( + event_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -237,8 +237,6 @@ class FederationHandler(BaseHandler): ) ) - events_and_states = zip(events, states) - def redact_disallowed(event_and_state): event, state = event_and_state @@ -275,9 +273,10 @@ class FederationHandler(BaseHandler): return event - res = map(redact_disallowed, events_and_states) - - logger.info("_filter_events_for_server %r", res) + res = map(redact_disallowed, [ + (e, event_to_state[e.event_id]) + for e in events + ]) defer.returnValue(res) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 11c736f727..95a8f05c05 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -137,7 +137,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( + event_id_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -145,7 +145,8 @@ class MessageHandler(BaseHandler): ) ) - events_and_states = zip(events, states) + for ev, state in event_id_to_state.items(): + logger.info("event_id: %r, state: %r", ev, state) def allowed(event_and_state): event, state = event_and_state @@ -179,10 +180,17 @@ class MessageHandler(BaseHandler): return True - events_and_states = filter(allowed, events_and_states) + event_and_state = filter( + allowed, + [ + (e, event_id_to_state[e.event_id]) + for e in events + ] + ) + defer.returnValue([ ev - for ev, _ in events_and_states + for ev, _ in event_and_state ]) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8f58774b31..9a97bff840 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -294,7 +294,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( + event_id_to_state = yield self.store.get_state_for_events( room_id, frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), @@ -302,7 +302,8 @@ class SyncHandler(BaseHandler): ) ) - events_and_states = zip(events, states) + for ev, state in event_id_to_state.items(): + logger.info("event_id: %r, state: %r", ev, state) def allowed(event_and_state): event, state = event_and_state @@ -335,10 +336,18 @@ class SyncHandler(BaseHandler): return membership == Membership.INVITE return True - events_and_states = filter(allowed, events_and_states) + + event_and_state = filter( + allowed, + [ + (e, event_id_to_state[e.event_id]) + for e in events + ] + ) + defer.returnValue([ ev - for ev, _ in events_and_states + for ev, _ in event_and_state ]) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 19b16ed404..a438530071 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -239,10 +239,10 @@ class StateStore(SQLBaseStore): for event_id, group in event_to_groups.items() } - defer.returnValue([ - event_to_state[event] + defer.returnValue({ + event: event_to_state[event] for event in event_ids - ]) + }) @cached(num_args=2, lru=True, max_entries=100000) def _get_state_group_for_event(self, room_id, event_id): -- cgit 1.5.1 From dc8399ee0059bb4ee93fb7c755bc36ade16230a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2015 11:30:59 +0100 Subject: Remove debug loggers --- synapse/handlers/message.py | 3 --- synapse/handlers/sync.py | 3 --- 2 files changed, 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 95a8f05c05..b941312eff 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -145,9 +145,6 @@ class MessageHandler(BaseHandler): ) ) - for ev, state in event_id_to_state.items(): - logger.info("event_id: %r, state: %r", ev, state) - def allowed(event_and_state): event, state = event_and_state diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9a97bff840..d960078e7a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -302,9 +302,6 @@ class SyncHandler(BaseHandler): ) ) - for ev, state in event_id_to_state.items(): - logger.info("event_id: %r, state: %r", ev, state) - def allowed(event_and_state): event, state = event_and_state -- cgit 1.5.1 From f7e2f981ea1feb8461a5bddd9378bd5084833fc0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 16:01:10 +0100 Subject: Use list comprehension instead of filter --- synapse/handlers/message.py | 13 +++---------- synapse/handlers/sync.py | 13 +++---------- 2 files changed, 6 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b941312eff..2c4af8dc97 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -177,17 +177,10 @@ class MessageHandler(BaseHandler): return True - event_and_state = filter( - allowed, - [ - (e, event_id_to_state[e.event_id]) - for e in events - ] - ) - defer.returnValue([ - ev - for ev, _ in event_and_state + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d960078e7a..ec8d78ba8c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -334,17 +334,10 @@ class SyncHandler(BaseHandler): return True - event_and_state = filter( - allowed, - [ - (e, event_id_to_state[e.event_id]) - for e in events - ] - ) - defer.returnValue([ - ev - for ev, _ in event_and_state + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks -- cgit 1.5.1 From a7eeb34c64d828539dd6799f2347371a8eabae73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 16:02:05 +0100 Subject: Simplify staggered deferred lists --- synapse/handlers/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2c4af8dc97..8a9e6cf6ca 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -406,9 +406,9 @@ class MessageHandler(BaseHandler): # Only do N rooms at once n = 5 d_list = [handle_room(e) for e in room_list] - for ds in [d_list[i:i + n] for i in range(0, len(d_list), n)]: + for i in range(0, len(d_list), n): yield defer.gatherResults( - ds, + d_list[i:i + n], consumeErrors=True ).addErrback(unwrapFirstError) -- cgit 1.5.1 From 7b0e7970800df8cedf7966e6fb3837ee233d9ea4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Aug 2015 17:05:24 +0100 Subject: Fix _filter_events_for_client --- synapse/handlers/message.py | 4 +--- synapse/handlers/sync.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a9e6cf6ca..29e81085d1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -145,9 +145,7 @@ class MessageHandler(BaseHandler): ) ) - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec8d78ba8c..7206ae23d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -302,9 +302,7 @@ class SyncHandler(BaseHandler): ) ) - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True -- cgit 1.5.1 From 9f7f228ec2e2948c69ca3910d27fffdd2c2fea50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 17:20:59 +0100 Subject: Remove pointless map --- synapse/handlers/federation.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2bfd0a40e0..1e3dccf5a8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -237,9 +237,7 @@ class FederationHandler(BaseHandler): ) ) - def redact_disallowed(event_and_state): - event, state = event_and_state - + def redact_disallowed(event, state): if not state: return event @@ -273,13 +271,11 @@ class FederationHandler(BaseHandler): return event - res = map(redact_disallowed, [ - (e, event_to_state[e.event_id]) + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) for e in events ]) - defer.returnValue(res) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): -- cgit 1.5.1