From 9b334b3f97057ac145622d2e4d0ad036ef27b468 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 11 Mar 2018 20:01:41 +0000 Subject: WIP experiment in lazyloading room members --- synapse/handlers/sync.py | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0f713ce038..809e9fece9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -399,7 +399,7 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event): + def get_state_after_event(self, event, types=None): """ Get the room state after the given event @@ -409,14 +409,14 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id) + state_ids = yield self.store.get_state_ids_for_event(event.event_id, types) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position): + def get_state_at(self, room_id, stream_position, types=None): """ Get the room state at a particular stream position Args: @@ -432,7 +432,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event) + state = yield self.get_state_after_event(last_event, types) else: # no events in this room - so presumably no state @@ -441,7 +441,7 @@ class SyncHandler(object): @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, - full_state): + full_state, filter_members): """ Works out the differnce in state between the start of the timeline and the previous sync. @@ -454,6 +454,8 @@ class SyncHandler(object): be None. now_token(str): Token of the end of the current batch. full_state(bool): Whether to force returning the full state. + filter_members(bool): Whether to only return state for members + referenced in this timeline segment Returns: A deferred new event dictionary @@ -464,18 +466,35 @@ class SyncHandler(object): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): + + types = None + if filter_members: + # We only request state for the members needed to display the + # timeline: + types = ( + (EventTypes.Member, state_key) + for state_key in set( + event.sender # FIXME: we also care about targets etc. + for event in batch.events + ) + ) + types.append((None, None)) # don't just filter to room members + + # TODO: we should opportunistically deduplicate these members too + # within the same sync series (based on an in-memory cache) + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token + room_id, stream_position=now_token, types=types ) state_ids = current_state_ids @@ -493,15 +512,15 @@ class SyncHandler(object): ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token + room_id, stream_position=since_token, types=types ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types ) timeline_state = { @@ -1325,7 +1344,7 @@ class SyncHandler(object): state = yield self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, - full_state=full_state + full_state=full_state, filter_members=True ) if room_builder.rtype == "joined": -- cgit 1.5.1 From 87133652657c5073616419b0afc533eac6ae6750 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 11 Mar 2018 20:10:25 +0000 Subject: typos --- synapse/handlers/sync.py | 4 ++-- synapse/storage/state.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 809e9fece9..fa730ca760 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -471,13 +471,13 @@ class SyncHandler(object): if filter_members: # We only request state for the members needed to display the # timeline: - types = ( + types = [ (EventTypes.Member, state_key) for state_key in set( event.sender # FIXME: we also care about targets etc. for event in batch.events ) - ) + ] types.append((None, None)) # don't just filter to room members # TODO: we should opportunistically deduplicate these members too diff --git a/synapse/storage/state.py b/synapse/storage/state.py index da6bb685fa..0238200286 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -301,6 +301,8 @@ class StateGroupWorkerStore(SQLBaseStore): args = [next_group] if types: args.extend(i for typ in types for i in typ) + if include_other_types: + args.extend(typ for (typ, _) in types) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" -- cgit 1.5.1 From 14a9d2f73d50225f190f42e270cbf9ef7447bd8c Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 22:03:42 +0000 Subject: ensure we always include the members for a given timeline block --- synapse/handlers/sync.py | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fa730ca760..c754cfdeeb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -468,6 +468,8 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): types = None + member_state_ids = {} + if filter_members: # We only request state for the members needed to display the # timeline: @@ -492,6 +494,13 @@ class SyncHandler(object): state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types ) + + if filter_members: + member_state_ids = { + t: state_ids[t] + for t in state_ids if t[0] == EventTypes.member + } + else: current_state_ids = yield self.get_state_at( room_id, stream_position=now_token, types=types @@ -499,6 +508,12 @@ class SyncHandler(object): state_ids = current_state_ids + if filter_members: + member_state_ids = { + t: state_ids[t] + for t in state_ids if t[0] == EventTypes.member + } + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -507,6 +522,7 @@ class SyncHandler(object): state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, + timeline_start_members=member_state_ids, previous={}, current=current_state_ids, ) @@ -523,6 +539,12 @@ class SyncHandler(object): batch.events[0].event_id, types=types ) + if filter_members: + member_state_ids = { + t: state_at_timeline_start[t] + for t in state_ids if t[0] == EventTypes.member + } + timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -531,6 +553,7 @@ class SyncHandler(object): state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, + timeline_start_members=member_state_ids, previous=state_at_previous_sync, current=current_state_ids, ) @@ -1440,12 +1463,16 @@ def _action_has_highlight(actions): return False -def _calculate_state(timeline_contains, timeline_start, previous, current): +def _calculate_state(timeline_contains, timeline_start, timeline_start_members, + previous, current): """Works out what state to include in a sync response. Args: timeline_contains (dict): state in the timeline timeline_start (dict): state at the start of the timeline + timeline_start_members (dict): state at the start of the timeline + for room members who participate in this chunk of timeline. + Should always be a subset of timeline_start. previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline @@ -1464,11 +1491,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): } c_ids = set(e for e in current.values()) - tc_ids = set(e for e in timeline_contains.values()) - p_ids = set(e for e in previous.values()) ts_ids = set(e for e in timeline_start.values()) + tsm_ids = set(e for e in timeline_start_members.values()) + p_ids = set(e for e in previous.values()) + tc_ids = set(e for e in timeline_contains.values()) - state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids + state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | tsm_ids return { event_id_to_key[e]: e for e in state_ids -- cgit 1.5.1 From ccca02846d07124f537b0c475308f9a26bfb3fb1 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 22:31:41 +0000 Subject: make it work --- synapse/handlers/sync.py | 6 +++--- synapse/storage/state.py | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c754cfdeeb..c05e3d107f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -498,7 +498,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } else: @@ -511,7 +511,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } timeline_state = { @@ -542,7 +542,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_at_timeline_start[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } timeline_state = { diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4291cde7ab..9c9994c073 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -257,10 +257,11 @@ class StateGroupWorkerStore(SQLBaseStore): if include_other_types: # XXX: check whether this slows postgres down like a list of # ORs does too? + unique_types = set([ t for (t, _) in types ]) clause_to_args.append( ( - "AND type <> ? " * len(types), - [t for (t, _) in types] + "AND type <> ? " * len(unique_types), + list(unique_types) ) ) else: @@ -293,10 +294,11 @@ class StateGroupWorkerStore(SQLBaseStore): where_args.extend([typ[0], typ[1]]) if include_other_types: + unique_types = set([ t for (t, _) in types ]) where_clauses.append( - "(" + " AND ".join(["type <> ?"] * len(types)) + ")" + "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" ) - where_args.extend(t for (t, _) in types) + where_args.extend(list(unique_types)) where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: -- cgit 1.5.1 From c9d72e4571752554dfe01d755ae23f55c5f84ade Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 23:46:45 +0000 Subject: oops --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c05e3d107f..887624c431 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -542,7 +542,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_at_timeline_start[t] - for t in state_ids if t[0] == EventTypes.Member + for t in state_at_timeline_start if t[0] == EventTypes.Member } timeline_state = { -- cgit 1.5.1 From 4d0cfef6ee023bfe83113a0378321830ebde1619 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 14 Mar 2018 00:02:20 +0000 Subject: add copyright to nudge CI --- synapse/handlers/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 887624c431..edbd2ae771 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2015 - 2016 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.5.1 From 3bc5bd2d22e6b53ec1f89760301df1517e71b53a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 16 Mar 2018 00:52:04 +0000 Subject: make incr syncs work --- synapse/handlers/sync.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index edbd2ae771..84c894ca4a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -499,7 +499,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.Member + for t in state_ids if state_ids[t][0] == EventTypes.Member } else: @@ -512,7 +512,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.Member + for t in state_ids if state_ids[t][0] == EventTypes.Member } timeline_state = { @@ -543,7 +543,8 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_at_timeline_start[t] - for t in state_at_timeline_start if t[0] == EventTypes.Member + for t in state_at_timeline_start + if state_at_timeline_start[t][0] == EventTypes.Member } timeline_state = { -- cgit 1.5.1 From 5b3b3aada8952b53f82723227c9758ed47450a2e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 16 Mar 2018 01:17:34 +0000 Subject: simplify timeline_start_members --- synapse/handlers/sync.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 84c894ca4a..ffb4f7915e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -496,12 +496,6 @@ class SyncHandler(object): batch.events[0].event_id, types=types ) - if filter_members: - member_state_ids = { - t: state_ids[t] - for t in state_ids if state_ids[t][0] == EventTypes.Member - } - else: current_state_ids = yield self.get_state_at( room_id, stream_position=now_token, types=types @@ -509,11 +503,13 @@ class SyncHandler(object): state_ids = current_state_ids - if filter_members: - member_state_ids = { - t: state_ids[t] - for t in state_ids if state_ids[t][0] == EventTypes.Member - } + if filter_members: + logger.info("Finding members from %r", state_ids) + member_state_ids = { + e: state_ids[e] + for e in state_ids if state_ids[e][0] == EventTypes.Member + } + logger.info("Found members %r", member_state_ids) timeline_state = { (event.type, event.state_key): event.event_id @@ -541,11 +537,14 @@ class SyncHandler(object): ) if filter_members: + logger.info("Finding members from %r", state_at_timeline_start) member_state_ids = { - t: state_at_timeline_start[t] - for t in state_at_timeline_start - if state_at_timeline_start[t][0] == EventTypes.Member + e: state_at_timeline_start[e] + for e in state_at_timeline_start + if state_at_timeline_start[e][0] == EventTypes.Member } + logger.info("Found members %r", member_state_ids) + timeline_state = { (event.type, event.state_key): event.event_id -- cgit 1.5.1 From f7dcc404f216383bfd62e4611c6a28c3f13576dc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 16 Mar 2018 01:37:53 +0000 Subject: add state_ids for timeline entries --- synapse/handlers/sync.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ffb4f7915e..9b7e598e74 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -474,6 +474,7 @@ class SyncHandler(object): if filter_members: # We only request state for the members needed to display the # timeline: + types = [ (EventTypes.Member, state_key) for state_key in set( @@ -481,11 +482,14 @@ class SyncHandler(object): for event in batch.events ) ] - types.append((None, None)) # don't just filter to room members - # TODO: we should opportunistically deduplicate these members too + # TODO: we should opportunistically deduplicate these members here # within the same sync series (based on an in-memory cache) + if not types: + filter_members = False + types.append((None, None)) # don't just filter to room members + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( @@ -545,7 +549,6 @@ class SyncHandler(object): } logger.info("Found members %r", member_state_ids) - timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -559,7 +562,14 @@ class SyncHandler(object): current=current_state_ids, ) else: - state_ids = {} + if filter_members: + # strip off the (None, None) and filter to just room members + types = types[:-1] + state_ids = yield self.store.get_state_ids_for_event( + batch.events[0].event_id, types=types + ) + else: + state_ids = {} state = {} if state_ids: -- cgit 1.5.1 From 4f0493c850d4611e8ada42c1de54a18e8dc15a37 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 16 Mar 2018 01:43:37 +0000 Subject: fix tsm search again --- synapse/handlers/sync.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9b7e598e74..4bf85a128f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,8 +510,8 @@ class SyncHandler(object): if filter_members: logger.info("Finding members from %r", state_ids) member_state_ids = { - e: state_ids[e] - for e in state_ids if state_ids[e][0] == EventTypes.Member + t: state_ids[t] + for t in state_ids if t[0] == EventTypes.Member } logger.info("Found members %r", member_state_ids) @@ -543,9 +543,8 @@ class SyncHandler(object): if filter_members: logger.info("Finding members from %r", state_at_timeline_start) member_state_ids = { - e: state_at_timeline_start[e] - for e in state_at_timeline_start - if state_at_timeline_start[e][0] == EventTypes.Member + t: state_at_timeline_start[t] + for t in state_at_timeline_start if t[0] == EventTypes.Member } logger.info("Found members %r", member_state_ids) -- cgit 1.5.1 From fc5397fdf5acefd33bd3b808b6d8cc7c31b69b55 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 16 Mar 2018 01:44:55 +0000 Subject: remove debug --- synapse/handlers/sync.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4bf85a128f..b7f42bd594 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -508,12 +508,10 @@ class SyncHandler(object): state_ids = current_state_ids if filter_members: - logger.info("Finding members from %r", state_ids) member_state_ids = { t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member } - logger.info("Found members %r", member_state_ids) timeline_state = { (event.type, event.state_key): event.event_id @@ -541,12 +539,10 @@ class SyncHandler(object): ) if filter_members: - logger.info("Finding members from %r", state_at_timeline_start) member_state_ids = { t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member } - logger.info("Found members %r", member_state_ids) timeline_state = { (event.type, event.state_key): event.event_id -- cgit 1.5.1 From 366f730bf697fe8fbb18a509ec1852987bc80410 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 18 Mar 2018 21:40:35 +0000 Subject: only get member state IDs for incremental syncs if we're filtering --- synapse/handlers/sync.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b7f42bd594..6b57afd97b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -557,14 +557,14 @@ class SyncHandler(object): current=current_state_ids, ) else: + state_ids = {} if filter_members: # strip off the (None, None) and filter to just room members types = types[:-1] - state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types - ) - else: - state_ids = {} + if types: + state_ids = yield self.store.get_state_ids_for_event( + batch.events[0].event_id, types=types + ) state = {} if state_ids: -- cgit 1.5.1 From 478af0f72005708dbbed23e30c547c3d66c07c0e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 19 Mar 2018 01:00:12 +0000 Subject: reshuffle todo & comments --- synapse/handlers/sync.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6b57afd97b..76f5057377 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -483,11 +483,15 @@ class SyncHandler(object): ) ] - # TODO: we should opportunistically deduplicate these members here - # within the same sync series (based on an in-memory cache) + # We can't remove redundant member types at this stage as it has + # to be done based on event_id, and we don't have the member + # event ids until we've pulled them out of the DB. if not types: + # an optimisation to stop needlessly trying to calculate + # member_state_ids filter_members = False + types.append((None, None)) # don't just filter to room members if full_state: @@ -559,6 +563,10 @@ class SyncHandler(object): else: state_ids = {} if filter_members: + # TODO: filter out redundant members based on their mxids (not their + # event_ids) at this point. We know we can do it based on mxid as this + # is an non-gappy incremental sync. + # strip off the (None, None) and filter to just room members types = types[:-1] if types: -- cgit 1.5.1 From b2f22829475ccfe19e994aedddb8d04995018bf4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 19 Mar 2018 01:15:13 +0000 Subject: make lazy_load_members configurable in filters --- synapse/api/filtering.py | 6 ++++++ synapse/handlers/sync.py | 18 +++++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 83206348e5..339e4a31d6 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -260,6 +260,9 @@ class FilterCollection(object): def ephemeral_limit(self): return self._room_ephemeral_filter.limit() + def lazy_load_members(self): + return self._room_state_filter.lazy_load_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -416,6 +419,9 @@ class Filter(object): def limit(self): return self.filter_json.get("limit", 10) + def lazy_load_members(self): + return self.filter_json.get("lazy_load_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 76f5057377..f521d22e91 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -442,7 +442,7 @@ class SyncHandler(object): @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, - full_state, filter_members): + full_state): """ Works out the differnce in state between the start of the timeline and the previous sync. @@ -455,7 +455,7 @@ class SyncHandler(object): be None. now_token(str): Token of the end of the current batch. full_state(bool): Whether to force returning the full state. - filter_members(bool): Whether to only return state for members + lazy_load_members(bool): Whether to only return state for members referenced in this timeline segment Returns: @@ -470,8 +470,9 @@ class SyncHandler(object): types = None member_state_ids = {} + lazy_load_members = sync_config.filter_collection.lazy_load_members() - if filter_members: + if lazy_load_members: # We only request state for the members needed to display the # timeline: @@ -490,7 +491,7 @@ class SyncHandler(object): if not types: # an optimisation to stop needlessly trying to calculate # member_state_ids - filter_members = False + lazy_load_members = False types.append((None, None)) # don't just filter to room members @@ -511,7 +512,7 @@ class SyncHandler(object): state_ids = current_state_ids - if filter_members: + if lazy_load_members: member_state_ids = { t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member @@ -542,7 +543,7 @@ class SyncHandler(object): batch.events[0].event_id, types=types ) - if filter_members: + if lazy_load_members: member_state_ids = { t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member @@ -562,7 +563,7 @@ class SyncHandler(object): ) else: state_ids = {} - if filter_members: + if lazy_load_members: # TODO: filter out redundant members based on their mxids (not their # event_ids) at this point. We know we can do it based on mxid as this # is an non-gappy incremental sync. @@ -1380,8 +1381,7 @@ class SyncHandler(object): return state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state, filter_members=True + room_id, batch, sync_config, since_token, now_token, full_state=full_state ) if room_builder.rtype == "joined": -- cgit 1.5.1 From a6c8f7c875348ff8d63a7032c2f73a08551c516c Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 29 May 2018 01:09:55 +0100 Subject: add pydoc --- synapse/handlers/sync.py | 18 ++++++++---- synapse/storage/state.py | 76 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 67 insertions(+), 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 05bf6d46dd..8e38078332 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -423,7 +423,11 @@ class SyncHandler(object): Args: event(synapse.events.EventBase): event of interest - + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A Deferred map from ((type, state_key)->Event) """ @@ -440,6 +444,11 @@ class SyncHandler(object): Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A Deferred map from ((type, state_key)->Event) @@ -472,8 +481,6 @@ class SyncHandler(object): be None. now_token(str): Token of the end of the current batch. full_state(bool): Whether to force returning the full state. - lazy_load_members(bool): Whether to only return state for members - referenced in this timeline segment Returns: A deferred new event dictionary @@ -496,7 +503,7 @@ class SyncHandler(object): types = [ (EventTypes.Member, state_key) for state_key in set( - event.sender # FIXME: we also care about targets etc. + event.sender # FIXME: we also care about invite targets etc. for event in batch.events ) ] @@ -1398,7 +1405,8 @@ class SyncHandler(object): return state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, full_state=full_state + room_id, batch, sync_config, since_token, now_token, + full_state=full_state ) if room_builder.rtype == "joined": diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55159e64d0..63b6834202 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -182,7 +182,19 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> (dict of (type, state_key) -> event id) + """Returns the state groups for a given set of groups, filtering on + types of state events. + + Args: + groups(list[int]): list of state group IDs to query + types(list[str|None, str|None])|None: List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. If None, + all types are returned. + + Returns: + dictionary state_group -> (dict of (type, state_key) -> event id) """ results = {} @@ -204,6 +216,9 @@ class StateGroupWorkerStore(SQLBaseStore): if types is not None: type_set = set(types) if (None, None) in type_set: + # special case (None, None) to mean that other types should be + # returned - i.e. we were just filtering down the state keys + # for particular types. include_other_types = True type_set.remove((None, None)) types = list(type_set) # deduplicate types list @@ -360,10 +375,12 @@ class StateGroupWorkerStore(SQLBaseStore): that are in the `types` list. Args: - event_ids (list) - types (list): List of (type, state_key) tuples which are used to - filter the state fetched. `state_key` may be None, which matches - any `state_key` + event_ids (list[string]) + types (list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: deferred: A list of dicts corresponding to the event_ids given. @@ -399,9 +416,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_ids(list(str)): events whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from event_id -> (type, state_key) -> state_event @@ -427,9 +446,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from (type, state_key) -> state_event @@ -444,9 +465,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from (type, state_key) -> state_event @@ -492,11 +515,11 @@ class StateGroupWorkerStore(SQLBaseStore): missing state. Args: - group: The state group to lookup - types (list): List of 2-tuples of the form (`type`, `state_key`), - where a `state_key` of `None` matches all state_keys for the - `type`. Presence of type of `None` indicates that types not - in the list should not be filtered out. + group(int): The state group to lookup + types(list[str|None, str|None]): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) @@ -560,9 +583,18 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_for_groups(self, groups, types=None): """Given list of groups returns dict of group -> list of state events - with matching types. `types` is a list of `(type, state_key)`, where - a `state_key` of None matches all state_keys. If `types` is None then - all events are returned. + with matching types. + + Args: + groups(list[int]): list of groups whose state to query + types(list[str|None, str|None]|None): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. If None, + all events are returned. + + Returns: + dict of group -> list of state events """ if types: types = frozenset(types) -- cgit 1.5.1 From 5f6122fe102f994e023d530cb6076730f31f619f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 4 Jun 2018 00:08:52 +0300 Subject: more comments --- synapse/handlers/sync.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8e38078332..7ab97b24a6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -515,6 +515,9 @@ class SyncHandler(object): if not types: # an optimisation to stop needlessly trying to calculate # member_state_ids + # + # XXX: i can't remember what this trying to do. why would + # types ever be []? --matthew lazy_load_members = False types.append((None, None)) # don't just filter to room members @@ -568,6 +571,10 @@ class SyncHandler(object): ) if lazy_load_members: + # TODO: filter out redundant members based on their event_ids + # (not mxids) at this point. In practice, limited syncs are + # relatively rare so it's not a total disaster to send redundant + # members down at this point. member_state_ids = { t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member -- cgit 1.5.1 From 8503dd0047119caa5b98a3fd56ac2b14dd09af0b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jun 2018 16:03:16 +0100 Subject: Remove event re-signing hacks These "temporary fixes" have been here three and a half years, and I can't find any events in the matrix.org database where the calculated signature differs from what's in the db. It's time for them to go away. --- synapse/handlers/federation.py | 43 ------------------------------------------ 1 file changed, 43 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcf94befb7..60b97b140e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -938,16 +938,6 @@ class FederationHandler(BaseHandler): [auth_id for auth_id, _ in event.auth_events], include_given=True ) - - for event in auth: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue([e for e in auth]) @log_function @@ -1405,18 +1395,6 @@ class FederationHandler(BaseHandler): del results[(event.type, event.state_key)] res = list(results.values()) - for event in res: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue(res) else: defer.returnValue([]) @@ -1481,18 +1459,6 @@ class FederationHandler(BaseHandler): ) if event: - if self.is_mine_id(event.event_id): - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - if do_auth: in_room = yield self.auth.check_host_in_room( event.room_id, @@ -1760,15 +1726,6 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) - for event in ret["auth_chain"]: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) -- cgit 1.5.1 From ea69d3565110a20f5abe00f1a5c2357b483140fb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 16 Jul 2018 11:38:45 +0100 Subject: Move filter_events_for_server out of FederationHandler for easier unit testing. --- synapse/handlers/federation.py | 144 ++--------------------------------------- synapse/visibility.py | 132 +++++++++++++++++++++++++++++++++++++ tests/test_visibility.py | 107 ++++++++++++++++++++++++++++++ 3 files changed, 245 insertions(+), 138 deletions(-) create mode 100644 tests/test_visibility.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d3ecebd29f..20fb46fc89 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.crypto.event_signing import ( add_hashes_and_signatures, compute_event_signature, ) -from synapse.events.utils import prune_event from synapse.events.validator import EventValidator from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id @@ -52,8 +51,8 @@ from synapse.util.async import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function -from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination +from synapse.visibility import filter_events_for_server from ._base import BaseHandler @@ -501,137 +500,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - @measure_func("_filter_events_for_server") - @defer.inlineCallbacks - def _filter_events_for_server(self, server_name, room_id, events): - """Filter the given events for the given server, redacting those the - server can't see. - - Assumes the server is currently in the room. - - Returns - list[FrozenEvent] - """ - # First lets check to see if all the events have a history visibility - # of "shared" or "world_readable". If thats the case then we don't - # need to check membership (as we know the server is in the room). - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - ) - ) - - visibility_ids = set() - for sids in event_to_state_ids.itervalues(): - hist = sids.get((EventTypes.RoomHistoryVisibility, "")) - if hist: - visibility_ids.add(hist) - - # If we failed to find any history visibility events then the default - # is "shared" visiblity. - if not visibility_ids: - defer.returnValue(events) - - event_map = yield self.store.get_events(visibility_ids) - all_open = all( - e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() - ) - - if all_open: - defer.returnValue(events) - - # Ok, so we're dealing with events that have non-trivial visibility - # rules, so we need to also get the memberships of the room. - - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), - ) - ) - - # We only want to pull out member events that correspond to the - # server's domain. - - def check_match(id): - try: - return server_name == get_domain_from_id(id) - except Exception: - return False - - # Parses mapping `event_id -> (type, state_key) -> state event_id` - # to get all state ids that we're interested in. - event_map = yield self.store.get_events([ - e_id - for key_to_eid in list(event_to_state_ids.values()) - for key, e_id in key_to_eid.items() - if key[0] != EventTypes.Member or check_match(key[1]) - ]) - - event_to_state = { - e_id: { - key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() - if inner_e_id in event_map - } - for e_id, key_to_eid in event_to_state_ids.iteritems() - } - - erased_senders = yield self.store.are_users_erased( - e.sender for e in events, - ) - - def redact_disallowed(event, state): - # if the sender has been gdpr17ed, always return a redacted - # copy of the event. - if erased_senders[event.sender]: - logger.info( - "Sender of %s has been erased, redacting", - event.event_id, - ) - return prune_event(event) - - if not state: - return event - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - if visibility in ["invited", "joined"]: - # We now loop through all state events looking for - # membership states for the requesting server to determine - # if the server is either in the room or has been invited - # into the room. - for ev in state.itervalues(): - if ev.type != EventTypes.Member: - continue - try: - domain = get_domain_from_id(ev.state_key) - except Exception: - continue - - if domain != server_name: - continue - - memtype = ev.membership - if memtype == Membership.JOIN: - return event - elif memtype == Membership.INVITE: - if visibility == "invited": - return event - else: - return prune_event(event) - - return event - - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities): @@ -1558,7 +1426,7 @@ class FederationHandler(BaseHandler): limit ) - events = yield self._filter_events_for_server(origin, room_id, events) + events = yield filter_events_for_server(self.store, origin, events) defer.returnValue(events) @@ -1605,8 +1473,8 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") - events = yield self._filter_events_for_server( - origin, event.room_id, [event] + events = yield filter_events_for_server( + self.store, origin, [event], ) event = events[0] defer.returnValue(event) @@ -1896,8 +1764,8 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) - missing_events = yield self._filter_events_for_server( - origin, room_id, missing_events, + missing_events = yield filter_events_for_server( + self.store, origin, missing_events, ) defer.returnValue(missing_events) diff --git a/synapse/visibility.py b/synapse/visibility.py index 015c2bab37..ddce41efaf 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -20,6 +20,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events.utils import prune_event +from synapse.types import get_domain_from_id from synapse.util.logcontext import make_deferred_yieldable, preserve_fn logger = logging.getLogger(__name__) @@ -225,3 +226,134 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, # we turn it into a list before returning it. defer.returnValue(list(filtered_events)) + + +@defer.inlineCallbacks +def filter_events_for_server(store, server_name, events): + """Filter the given events for the given server, redacting those the + server can't see. + + Assumes the server is currently in the room. + + Returns + list[FrozenEvent] + """ + # First lets check to see if all the events have a history visibility + # of "shared" or "world_readable". If thats the case then we don't + # need to check membership (as we know the server is in the room). + event_to_state_ids = yield store.get_state_ids_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + ) + ) + + visibility_ids = set() + for sids in event_to_state_ids.itervalues(): + hist = sids.get((EventTypes.RoomHistoryVisibility, "")) + if hist: + visibility_ids.add(hist) + + # If we failed to find any history visibility events then the default + # is "shared" visiblity. + if not visibility_ids: + defer.returnValue(events) + + event_map = yield store.get_events(visibility_ids) + all_open = all( + e.content.get("history_visibility") in (None, "shared", "world_readable") + for e in event_map.itervalues() + ) + + if all_open: + defer.returnValue(events) + + # Ok, so we're dealing with events that have non-trivial visibility + # rules, so we need to also get the memberships of the room. + + event_to_state_ids = yield store.get_state_ids_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) + ) + + # We only want to pull out member events that correspond to the + # server's domain. + + def check_match(id): + try: + return server_name == get_domain_from_id(id) + except Exception: + return False + + # Parses mapping `event_id -> (type, state_key) -> state event_id` + # to get all state ids that we're interested in. + event_map = yield store.get_events([ + e_id + for key_to_eid in list(event_to_state_ids.values()) + for key, e_id in key_to_eid.items() + if key[0] != EventTypes.Member or check_match(key[1]) + ]) + + event_to_state = { + e_id: { + key: event_map[inner_e_id] + for key, inner_e_id in key_to_eid.iteritems() + if inner_e_id in event_map + } + for e_id, key_to_eid in event_to_state_ids.iteritems() + } + + erased_senders = yield store.are_users_erased( + e.sender for e in events, + ) + + def redact_disallowed(event, state): + # if the sender has been gdpr17ed, always return a redacted + # copy of the event. + if erased_senders[event.sender]: + logger.info( + "Sender of %s has been erased, redacting", + event.event_id, + ) + return prune_event(event) + + if not state: + return event + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + if visibility in ["invited", "joined"]: + # We now loop through all state events looking for + # membership states for the requesting server to determine + # if the server is either in the room or has been invited + # into the room. + for ev in state.itervalues(): + if ev.type != EventTypes.Member: + continue + try: + domain = get_domain_from_id(ev.state_key) + except Exception: + continue + + if domain != server_name: + continue + + memtype = ev.membership + if memtype == Membership.JOIN: + return event + elif memtype == Membership.INVITE: + if visibility == "invited": + return event + else: + return prune_event(event) + + return event + + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) + for e in events + ]) diff --git a/tests/test_visibility.py b/tests/test_visibility.py new file mode 100644 index 0000000000..86981958cb --- /dev/null +++ b/tests/test_visibility.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer + +from synapse.visibility import filter_events_for_server +from tests import unittest +from tests.utils import setup_test_homeserver + +logger = logging.getLogger(__name__) + +TEST_ROOM_ID = "!TEST:ROOM" + + +class FilterEventsForServerTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver() + self.event_creation_handler = self.hs.get_event_creation_handler() + self.event_builder_factory = self.hs.get_event_builder_factory() + self.store = self.hs.get_datastore() + + @defer.inlineCallbacks + def test_filtering(self): + # + # The events to be filtered consist of 10 membership events (it doesn't + # really matter if they are joins or leaves, so let's make them joins). + # One of those membership events is going to be for a user on the + # server we are filtering for (so we can check the filtering is doing + # the right thing). + # + + # before we do that, we persist some other events to act as state. + self.inject_visibility("@admin:hs", "joined") + for i in range(0, 10): + yield self.inject_room_member("@resident%i:hs" % i) + + events_to_filter = [] + + for i in range(0, 10): + user = "@user%i:%s" % ( + i, "test_server" if i == 5 else "other_server" + ) + evt = yield self.inject_room_member(user, extra_content={"a": "b"}) + events_to_filter.append(evt) + + filtered = yield filter_events_for_server( + self.store, "test_server", events_to_filter, + ) + + # the result should be 5 redacted events, and 5 unredacted events. + for i in range(0, 5): + self.assertEqual(events_to_filter[i].event_id, filtered[i].event_id) + self.assertNotIn("a", filtered[i].content) + + for i in range(5, 10): + self.assertEqual(events_to_filter[i].event_id, filtered[i].event_id) + self.assertEqual(filtered[i].content["a"], "b") + + @defer.inlineCallbacks + def inject_visibility(self, user_id, visibility): + content = {"history_visibility": visibility} + builder = self.event_builder_factory.new({ + "type": "m.room.history_visibility", + "sender": user_id, + "state_key": "", + "room_id": TEST_ROOM_ID, + "content": content, + }) + + event, context = yield self.event_creation_handler.create_new_client_event( + builder + ) + yield self.hs.get_datastore().persist_event(event, context) + defer.returnValue(event) + + @defer.inlineCallbacks + def inject_room_member(self, user_id, membership="join", extra_content={}): + content = {"membership": membership} + content.update(extra_content) + builder = self.event_builder_factory.new({ + "type": "m.room.member", + "sender": user_id, + "state_key": user_id, + "room_id": TEST_ROOM_ID, + "content": content, + }) + + event, context = yield self.event_creation_handler.create_new_client_event( + builder + ) + + yield self.hs.get_datastore().persist_event(event, context) + defer.returnValue(event) -- cgit 1.5.1 From 8cb8df55e9bb5de27d9e6570441560eb81db36df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:22:02 +0100 Subject: Split MessageHandler into read only and writers This will let us call the read only parts from workers, and so be able to move some APIs off of master, e.g. the `/state` API. --- synapse/handlers/__init__.py | 2 - synapse/handlers/message.py | 281 +++++++++++++++++++++------------------- synapse/rest/client/v1/admin.py | 8 +- synapse/rest/client/v1/room.py | 20 ++- synapse/server.py | 14 +- 5 files changed, 176 insertions(+), 149 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 4b9923d8c0..0bad3e0a2e 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,6 @@ from .admin import AdminHandler from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler -from .message import MessageHandler from .register import RegistrationHandler from .room import RoomContextHandler from .search import SearchHandler @@ -44,7 +43,6 @@ class Handlers(object): def __init__(self, hs): self.registration_handler = RegistrationHandler(hs) - self.message_handler = MessageHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..3c6f9860d5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -75,12 +75,159 @@ class PurgeStatus(object): } -class MessageHandler(BaseHandler): +class MessageHandler(object): + """Contains some read only APIs to get state about a room + """ def __init__(self, hs): - super(MessageHandler, self).__init__(hs) - self.hs = hs + self.auth = hs.get_auth() + self.clock = hs.get_clock() self.state = hs.get_state_handler() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", is_guest=False): + """ Get data from a room. + + Args: + event : The room path event + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + data = yield self.state.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) + + defer.returnValue(data) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A list of dicts representing state events. [{}, {}, {}] + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + room_state = yield self.state.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] + + now = self.clock.time_msec() + defer.returnValue( + [serialize_event(c, now) for c in room_state.values()] + ) + + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or because there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in iteritems(users_with_profile) + }) + + +class PaginationHandler(MessageHandler): + """Handles pagination and purge history requests. + + These are in the same handler due to the fact we need to block clients + paginating during a purge. + + This subclasses MessageHandler to get at _check_in_room_or_world_readable + """ + + def __init__(self, hs): + super(PaginationHandler, self).__init__(hs) + + self.hs = hs + self.store = hs.get_datastore() self.clock = hs.get_clock() self.pagination_lock = ReadWriteLock() @@ -274,134 +421,6 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) - @defer.inlineCallbacks - def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key="", is_guest=False): - """ Get data from a room. - - Args: - event : The room path event - Returns: - The path data content. - Raises: - SynapseError if something went wrong. - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - elif membership == Membership.LEAVE: - key = (event_type, state_key) - room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] - ) - data = room_state[membership_event_id].get(key) - - defer.returnValue(data) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): - """Retrieve all state events for a given room. If the user is - joined to the room then return the current state. If the user has - left the room return the state events from when they left. - - Args: - user_id(str): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A list of dicts representing state events. [{}, {}, {}] - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None - ) - room_state = room_state[membership_event_id] - - now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] - ) - - @defer.inlineCallbacks - def get_joined_members(self, requester, room_id): - """Get all the joined members in the room and their profile information. - - If the user has left the room return the state events from when they left. - - Args: - requester(Requester): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A dict of user_id to profile info - """ - user_id = requester.user.to_string() - if not requester.app_service: - # We check AS auth after fetching the room membership, as it - # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - if membership != Membership.JOIN: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or because there - # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: - for uid in users_with_profile: - if requester.app_service.is_interested_in_user(uid): - break - else: - # Loop fell through, AS has no interested users in room - raise AuthError(403, "Appservice not in room") - - defer.returnValue({ - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in iteritems(users_with_profile) - }) - class EventCreationHandler(object): def __init__(self, hs): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 2dc50e582b..13fd63a5b2 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -123,7 +123,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() self.store = hs.get_datastore() @defer.inlineCallbacks @@ -198,7 +198,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = yield self.handlers.message_handler.start_purge_history( + purge_id = yield self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events, ) @@ -220,7 +220,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryStatusRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, purge_id): @@ -230,7 +230,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - purge_status = self.handlers.message_handler.get_purge_status(purge_id) + purge_status = self.pagination_handler.get_purge_status(purge_id) if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3d62447854..8b6be9da96 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() self.event_creation_hander = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() + self.message_handler = hs.get_message_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): format = parse_string(request, "format", default="content", allowed_values=["content", "event"]) - msg_handler = self.handlers.message_handler + msg_handler = self.message_handler data = yield msg_handler.get_room_data( user_id=requester.user.to_string(), room_id=room_id, @@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMemberListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.message_handler - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), ) @@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinedRoomMemberListRestServlet, self).__init__(hs) - self.message_handler = hs.get_handlers().message_handler + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMessageListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): event_filter = Filter(json.loads(filter_json)) else: event_filter = None - handler = self.handlers.message_handler - msgs = yield handler.get_messages( + msgs = yield self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomStateRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) - handler = self.handlers.message_handler # Get all the current state for this room - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), is_guest=requester.is_guest, diff --git a/synapse/server.py b/synapse/server.py index 92bea96c5c..b93bd6d7d9 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,7 +52,11 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler -from synapse.handlers.message import EventCreationHandler +from synapse.handlers.message import ( + EventCreationHandler, + MessageHandler, + PaginationHandler, +) from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler @@ -163,6 +167,8 @@ class HomeServer(object): 'federation_registry', 'server_notices_manager', 'server_notices_sender', + 'message_handler', + 'pagination_handler', ] def __init__(self, hostname, reactor=None, **kwargs): @@ -426,6 +432,12 @@ class HomeServer(object): return WorkerServerNoticesSender(self) return ServerNoticesSender(self) + def build_message_handler(self): + return MessageHandler(self) + + def build_pagination_handler(self): + return PaginationHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From bacdf0cbf9fdbf9bbab2420b86308830ac4e4592 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 15:29:45 +0100 Subject: Move RoomContextHandler out of Handlers This is in preparation for moving GET /context/ to a worker --- synapse/handlers/__init__.py | 2 -- synapse/handlers/room.py | 6 +++++- synapse/rest/client/v1/room.py | 4 ++-- synapse/server.py | 6 +++++- 4 files changed, 12 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 0bad3e0a2e..413425fed1 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -18,7 +18,6 @@ from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler from .register import RegistrationHandler -from .room import RoomContextHandler from .search import SearchHandler @@ -48,4 +47,3 @@ class Handlers(object): self.admin_handler = AdminHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) - self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f67512078b..6150b7e226 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler): ) -class RoomContextHandler(BaseHandler): +class RoomContextHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + @defer.inlineCallbacks def get_event_context(self, user, room_id, event_id, limit): """Retrieves events, pagination tokens and state around a given event diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 8b6be9da96..2c9459534e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -523,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomEventContextServlet, self).__init__(hs) self.clock = hs.get_clock() - self.handlers = hs.get_handlers() + self.room_context_handler = hs.get_room_context_handler() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -531,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet): limit = parse_integer(request, "limit", default=10) - results = yield self.handlers.room_context_handler.get_event_context( + results = yield self.room_context_handler.get_event_context( requester.user, room_id, event_id, diff --git a/synapse/server.py b/synapse/server.py index b93bd6d7d9..a24ea158df 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -61,7 +61,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler -from synapse.handlers.room import RoomCreationHandler +from synapse.handlers.room import RoomContextHandler, RoomCreationHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler @@ -169,6 +169,7 @@ class HomeServer(object): 'server_notices_sender', 'message_handler', 'pagination_handler', + 'room_context_handler', ] def __init__(self, hostname, reactor=None, **kwargs): @@ -438,6 +439,9 @@ class HomeServer(object): def build_pagination_handler(self): return PaginationHandler(self) + def build_room_context_handler(self): + return RoomContextHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From 924eb34d9428a4163a03249abbb6f40d4baa29c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 18:32:02 +0100 Subject: add a filtered_types param to limit filtering to specific types --- synapse/handlers/sync.py | 65 +++++++++++++++------------ synapse/storage/state.py | 113 +++++++++++++++++++++++++---------------------- 2 files changed, 96 insertions(+), 82 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0c21ac2c77..cb711b8758 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -417,38 +417,44 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event, types=None): + def get_state_after_event(self, event, types=None, filtered_types=None): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id, types) + state_ids = yield self.store.get_state_ids_for_event( + event.event_id, types, filtered_types=filtered_types + ) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position, types=None): + def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. - May be None, which matches any key. + all events are returned of the given type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) @@ -463,7 +469,9 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event, types) + state = yield self.get_state_after_event( + last_event, types, filtered_types=filtered_types + ) else: # no events in this room - so presumably no state @@ -499,6 +507,7 @@ class SyncHandler(object): types = None member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() + filtered_types = None if lazy_load_members: # We only request state for the members needed to display the @@ -516,29 +525,25 @@ class SyncHandler(object): # to be done based on event_id, and we don't have the member # event ids until we've pulled them out of the DB. - if not types: - # an optimisation to stop needlessly trying to calculate - # member_state_ids - # - # XXX: i can't remember what this trying to do. why would - # types ever be []? --matthew - lazy_load_members = False - - types.append((None, None)) # don't just filter to room members + # only apply the filtering to room members + filtered_types = [EventTypes.Member] if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types + batch.events[-1].event_id, types=types, + filtered_types=filtered_types ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token, types=types + room_id, stream_position=now_token, types=types, + filtered_types=filtered_types ) state_ids = current_state_ids @@ -563,15 +568,18 @@ class SyncHandler(object): ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token, types=types + room_id, stream_position=since_token, types=types, + filtered_types=filtered_types ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types + batch.events[-1].event_id, types=types, + filtered_types=filtered_types ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) if lazy_load_members: @@ -603,11 +611,10 @@ class SyncHandler(object): # event_ids) at this point. We know we can do it based on mxid as this # is an non-gappy incremental sync. - # strip off the (None, None) and filter to just room members - types = types[:-1] if types: state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) state = {} diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c5ff44fef7..ee531a2ce0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -185,7 +185,7 @@ class StateGroupWorkerStore(SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types): + def _get_state_groups_from_groups(self, groups, types, filtered_types=None): """Returns the state groups for a given set of groups, filtering on types of state events. @@ -193,9 +193,10 @@ class StateGroupWorkerStore(SQLBaseStore): groups(list[int]): list of state group IDs to query types(list[str|None, str|None])|None: List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. Presence of type of `None` indicates - that types not in the list should not be filtered out. If None, - all types are returned. + state_keys for the `type`. If None, all types are returned. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: dictionary state_group -> (dict of (type, state_key) -> event id) @@ -206,26 +207,21 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, + self._get_state_groups_from_groups_txn, chunk, types, filtered_types ) results.update(res) defer.returnValue(results) - def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + def _get_state_groups_from_groups_txn( + self, txn, groups, types=None, filtered_types=None + ): results = {group: {} for group in groups} - include_other_types = False + include_other_types = False if filtered_types is None else True if types is not None: - type_set = set(types) - if (None, None) in type_set: - # special case (None, None) to mean that other types should be - # returned - i.e. we were just filtering down the state keys - # for particular types. - include_other_types = True - type_set.remove((None, None)) - types = list(type_set) # deduplicate types list + types = list(set(types)) # deduplicate types list if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -276,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore): if include_other_types: # XXX: check whether this slows postgres down like a list of # ORs does too? - unique_types = set([t for (t, _) in types]) + unique_types = set(filtered_types) clause_to_args.append( ( "AND type <> ? " * len(unique_types), @@ -313,7 +309,7 @@ class StateGroupWorkerStore(SQLBaseStore): where_args.extend([typ[0], typ[1]]) if include_other_types: - unique_types = set([t for (t, _) in types]) + unique_types = set(filtered_types) where_clauses.append( "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" ) @@ -373,18 +369,20 @@ class StateGroupWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types): + def get_state_for_events(self, event_ids, types, filtered_types): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. Args: event_ids (list[string]) - types (list[(str|None, str|None)]|None): List of (type, state_key) tuples + types (list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: deferred: A list of dicts corresponding to the event_ids given. @@ -395,7 +393,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) state_event_map = yield self.get_events( [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], @@ -414,17 +412,19 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types=None): + def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ Get the state dicts corresponding to a list of events Args: event_ids(list(str)): events whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from event_id -> (type, state_key) -> state_event @@ -434,7 +434,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) event_to_state = { event_id: group_to_state[group] @@ -444,41 +444,45 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_for_event(self, event_id, types=None): + def get_state_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_for_events([event_id], types) + state_map = yield self.get_state_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @defer.inlineCallbacks - def get_state_ids_for_event(self, event_id, types=None): + def get_state_ids_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_ids_for_events([event_id], types) + state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @cached(max_entries=50000) @@ -509,7 +513,7 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, group, types): + def _get_some_state_from_cache(self, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). @@ -520,29 +524,30 @@ class StateGroupWorkerStore(SQLBaseStore): Args: group(int): The state group to lookup - types(list[str|None, str|None]): List of 2-tuples of the form + types(list[str, str|None]): List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. Presence of type of `None` indicates - that types not in the list should not be filtered out. + state_keys for the `type`. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} + + # tracks which of the requested types are missing from our cache missing_types = set() - include_other_types = False + include_other_types = True if filtered_types is None else False for typ, state_key in types: key = (typ, state_key) - if typ is None: - include_other_types = True - next - if state_key is None: type_to_key[typ] = None # XXX: why do we mark the type as missing from our cache just # because we weren't filtering on a specific value of state_key? + # is it because the cache doesn't handle wildcards? missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: @@ -556,7 +561,7 @@ class StateGroupWorkerStore(SQLBaseStore): def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return include_other_types + return include_other_types and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: @@ -585,21 +590,23 @@ class StateGroupWorkerStore(SQLBaseStore): return state_dict_ids, is_all @defer.inlineCallbacks - def _get_state_for_groups(self, groups, types=None): + def _get_state_for_groups(self, groups, types=None, filtered_types=None): """Gets the state at each of a list of state groups, optionally filtering by type/state_key Args: groups (iterable[int]): list of state groups for which we want to get the state. - types (None|iterable[(None|str, None|str)]): + types (None|iterable[(None, None|str)]): indicates the state type/keys required. If None, the whole state is fetched and returned. Otherwise, each entry should be a `(type, state_key)` tuple to include in the response. A `state_key` of None is a wildcard - meaning that we require all state with that type. A `type` of None - indicates that types not in the list should not be filtered out. + meaning that we require all state with that type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: Deferred[dict[int, dict[(type, state_key), EventBase]]] @@ -612,7 +619,7 @@ class StateGroupWorkerStore(SQLBaseStore): if types is not None: for group in set(groups): state_dict_ids, _, got_all = self._get_some_state_from_cache( - group, types, + group, types, filtered_types ) results[group] = state_dict_ids @@ -645,7 +652,7 @@ class StateGroupWorkerStore(SQLBaseStore): types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, + missing_groups, types_to_fetch, filtered_types ) for group, group_state_dict in iteritems(group_to_state_dict): -- cgit 1.5.1 From bcaec2915ac74937171e27d507b8f9c0e39d3677 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 19:03:50 +0100 Subject: incorporate review --- synapse/handlers/sync.py | 44 +++++++++++++++++++++++++++----------------- synapse/storage/state.py | 7 ++++--- 2 files changed, 31 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cb711b8758..b597f94cf6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -435,7 +435,7 @@ class SyncHandler(object): A Deferred map from ((type, state_key)->Event) """ state_ids = yield self.store.get_state_ids_for_event( - event.event_id, types, filtered_types=filtered_types + event.event_id, types, filtered_types=filtered_types, ) if event.is_state(): state_ids = state_ids.copy() @@ -470,7 +470,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] state = yield self.get_state_after_event( - last_event, types, filtered_types=filtered_types + last_event, types, filtered_types=filtered_types, ) else: @@ -505,7 +505,6 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): types = None - member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() filtered_types = None @@ -521,10 +520,6 @@ class SyncHandler(object): ) ] - # We can't remove redundant member types at this stage as it has - # to be done based on event_id, and we don't have the member - # event ids until we've pulled them out of the DB. - # only apply the filtering to room members filtered_types = [EventTypes.Member] @@ -532,27 +527,32 @@ class SyncHandler(object): if batch: current_state_ids = yield self.store.get_state_ids_for_event( batch.events[-1].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) else: current_state_ids = yield self.get_state_at( room_id, stream_position=now_token, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_ids = current_state_ids + # track the membership state events as of the beginning of this + # timeline sequence, so they can be filtered out of the state + # if we are lazy loading members. if lazy_load_members: member_state_ids = { t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member } + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -569,28 +569,38 @@ class SyncHandler(object): elif batch.limited: state_at_previous_sync = yield self.get_state_at( room_id, stream_position=since_token, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) current_state_ids = yield self.store.get_state_ids_for_event( batch.events[-1].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_at_timeline_start = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) + # track the membership state events as of the beginning of this + # timeline sequence, so they can be filtered out of the state + # if we are lazy loading members. if lazy_load_members: - # TODO: filter out redundant members based on their event_ids - # (not mxids) at this point. In practice, limited syncs are + # TODO: optionally filter out redundant membership events at this + # point, to stop repeatedly sending members in every /sync as if + # the client isn't tracking them. + # When implement, this should filter using event_ids (not mxids). + # In practice, limited syncs are # relatively rare so it's not a total disaster to send redundant - # members down at this point. + # members down at this point. Redundant members are ones which + # repeatedly get sent down /sync because we don't know if the client + # is caching them or not. member_state_ids = { t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member } + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -614,7 +624,7 @@ class SyncHandler(object): if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state = {} diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee531a2ce0..75c6366e7a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -545,9 +545,10 @@ class StateGroupWorkerStore(SQLBaseStore): if state_key is None: type_to_key[typ] = None - # XXX: why do we mark the type as missing from our cache just - # because we weren't filtering on a specific value of state_key? - # is it because the cache doesn't handle wildcards? + # we mark the type as missing from the cache because + # when the cache was populated it might have been done with a + # restricted set of state_keys, so the wildcard will not work + # and the cache may be incomplete. missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: -- cgit 1.5.1 From 8462c26485fb4f19fc52accc05870c0ea4c8eb6a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 12:43:23 +0100 Subject: Improvements to the Limiter * give them names, to improve logging * use a deque rather than a list for efficiency --- synapse/handlers/message.py | 2 +- synapse/util/async.py | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..8c12c6990f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -427,7 +427,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + self.limiter = Limiter(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/util/async.py b/synapse/util/async.py index 7d5acecb1c..22071ddef7 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging from contextlib import contextmanager @@ -248,11 +249,16 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count, clock=None): + def __init__(self, max_count=1, name=None, clock=None): """ Args: - max_count(int): The maximum number of concurrent access + max_count(int): The maximum number of concurrent accesses """ + if name is None: + self.name = id(self) + else: + self.name = name + if not clock: from twisted.internet import reactor clock = Clock(reactor) @@ -260,14 +266,14 @@ class Limiter(object): self.max_count = max_count # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing - # the second element is a list of deferreds for the things blocked from + # the first element is the number of things executing, and + # the second element is a deque of deferreds for the things blocked from # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, []]) + entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -277,10 +283,10 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock for key %r", key) - with PreserveLoggingContext(): - yield new_defer - logger.info("Acquired limiter lock for key %r", key) + logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + yield make_deferred_yieldable(new_defer) + + logger.info("Acquired limiter lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -296,7 +302,7 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock for key %r", key) + logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) entry[0] += 1 @contextmanager @@ -304,15 +310,16 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock for key %r", key) + logger.info("Releasing limiter lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them entry[0] -= 1 if entry[1]: - next_def = entry[1].pop(0) + next_def = entry[1].popleft() + # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) elif entry[0] == 0: -- cgit 1.5.1 From 7c712f95bbc7f405355d5714c92d65551f64fec2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:11:43 +0100 Subject: Combine Limiter and Linearizer Linearizer was effectively a Limiter with max_count=1, so rather than maintaining two sets of code, let's combine them. --- synapse/handlers/message.py | 4 +- synapse/util/async.py | 99 +++++-------------------------------------- tests/util/test_limiter.py | 70 ------------------------------ tests/util/test_linearizer.py | 47 ++++++++++++++++++++ 4 files changed, 59 insertions(+), 161 deletions(-) delete mode 100644 tests/util/test_limiter.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8c12c6990f..abc07ea87c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.util.async import Linearizer, ReadWriteLock from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func @@ -427,7 +427,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5, name="room_event_creation_limit") + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/util/async.py b/synapse/util/async.py index 22071ddef7..5a50d9700f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit): class Linearizer(object): - """Linearizes access to resources based on a key. Useful to ensure only one - thing is happening at a time on a given resource. - - Example: - - with (yield linearizer.queue("test_key")): - # do some work. - - """ - def __init__(self, name=None, clock=None): - if name is None: - self.name = id(self) - else: - self.name = name - self.key_to_defer = {} - - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - self._clock = clock - - @defer.inlineCallbacks - def queue(self, key): - # If there is already a deferred in the queue, we pull it out so that - # we can wait on it later. - # Then we replace it with a deferred that we resolve *after* the - # context manager has exited. - # We only return the context manager after the previous deferred has - # resolved. - # This all has the net effect of creating a chain of deferreds that - # wait for the previous deferred before starting their work. - current_defer = self.key_to_defer.get(key) - - new_defer = defer.Deferred() - self.key_to_defer[key] = new_defer - - if current_defer: - logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key - ) - try: - with PreserveLoggingContext(): - yield current_defer - except Exception: - logger.exception("Unexpected exception in Linearizer") - - logger.info("Acquired linearizer lock %r for key %r", self.name, - key) - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (There's no particular need for it to happen before we return - # the context manager, but it needs to happen while we hold the - # lock, and the context manager's exit code must be synchronous, - # so actually this is the only sensible place. - yield self._clock.sleep(0) - - else: - logger.info("Acquired uncontended linearizer lock %r for key %r", - self.name, key) - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing linearizer lock %r for key %r", self.name, key) - with PreserveLoggingContext(): - new_defer.callback(None) - current_d = self.key_to_defer.get(key) - if current_d is new_defer: - self.key_to_defer.pop(key, None) - - defer.returnValue(_ctx_manager()) - - -class Limiter(object): """Limits concurrent access to resources based on a key. Useful to ensure - only a few thing happen at a time on a given resource. + only a few things happen at a time on a given resource. Example: @@ -249,7 +166,7 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count=1, name=None, clock=None): + def __init__(self, name=None, max_count=1, clock=None): """ Args: max_count(int): The maximum number of concurrent accesses @@ -283,10 +200,12 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key, + ) yield make_deferred_yieldable(new_defer) - logger.info("Acquired limiter lock %r for key %r", self.name, key) + logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -302,7 +221,9 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) entry[0] += 1 @contextmanager @@ -310,7 +231,7 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock %r for key %r", self.name, key) + logger.info("Releasing linearizer lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py deleted file mode 100644 index f7b942f5c1..0000000000 --- a/tests/util/test_limiter.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from synapse.util.async import Limiter - -from tests import unittest - - -class LimiterTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def test_limiter(self): - limiter = Limiter(3) - - key = object() - - d1 = limiter.queue(key) - cm1 = yield d1 - - d2 = limiter.queue(key) - cm2 = yield d2 - - d3 = limiter.queue(key) - cm3 = yield d3 - - d4 = limiter.queue(key) - self.assertFalse(d4.called) - - d5 = limiter.queue(key) - self.assertFalse(d5.called) - - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) - - cm4 = yield d4 - self.assertFalse(d5.called) - - with cm3: - self.assertFalse(d5.called) - - cm5 = yield d5 - - with cm2: - pass - - with cm4: - pass - - with cm5: - pass - - d6 = limiter.queue(key) - with (yield d6): - pass diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c95907b32c..c9563124f9 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -65,3 +66,49 @@ class LinearizerTestCase(unittest.TestCase): func(i) return func(1000) + + @defer.inlineCallbacks + def test_multiple_entries(self): + limiter = Linearizer(max_count=3) + + key = object() + + d1 = limiter.queue(key) + cm1 = yield d1 + + d2 = limiter.queue(key) + cm2 = yield d2 + + d3 = limiter.queue(key) + cm3 = yield d3 + + d4 = limiter.queue(key) + self.assertFalse(d4.called) + + d5 = limiter.queue(key) + self.assertFalse(d5.called) + + with cm1: + self.assertFalse(d4.called) + self.assertFalse(d5.called) + + cm4 = yield d4 + self.assertFalse(d5.called) + + with cm3: + self.assertFalse(d5.called) + + cm5 = yield d5 + + with cm2: + pass + + with cm4: + pass + + with cm5: + pass + + d6 = limiter.queue(key) + with (yield d6): + pass -- cgit 1.5.1 From 0ecf68aedc09f4037208b613b692a8a98c78b3ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jul 2018 15:30:59 +0100 Subject: Move check_in_room_or_world_readable to Auth --- synapse/api/auth.py | 34 ++++++++++++++++++++++++++++++++++ synapse/handlers/message.py | 40 ++++++---------------------------------- 2 files changed, 40 insertions(+), 34 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bc629832d9..bf9efb170a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -737,3 +737,37 @@ class Auth(object): ) return query_params[0] + + @defer.inlineCallbacks + def check_in_room_or_world_readable(self, room_id, user_id): + """Checks that the user is or was in the room or the room is world + readable. If it isn't then an exception is raised. + + Returns: + Deferred[tuple[str, str|None]]: Resolves to the current membership of + the user in the room and the membership event ID of the user. If + the user is not in the room and never has been, then + `(Membership.JOIN, None)` is returned. + """ + + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + except AuthError: + visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3c6f9860d5..c1489cd066 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -97,7 +97,7 @@ class MessageHandler(object): Raises: SynapseError if something went wrong. """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) @@ -114,31 +114,6 @@ class MessageHandler(object): defer.returnValue(data) - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): """Retrieve all state events for a given room. If the user is @@ -151,7 +126,7 @@ class MessageHandler(object): Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) @@ -184,7 +159,7 @@ class MessageHandler(object): if not requester.app_service: # We check AS auth after fetching the room membership, as it # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( + membership, _ = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership != Membership.JOIN: @@ -214,19 +189,16 @@ class MessageHandler(object): }) -class PaginationHandler(MessageHandler): +class PaginationHandler(object): """Handles pagination and purge history requests. These are in the same handler due to the fact we need to block clients paginating during a purge. - - This subclasses MessageHandler to get at _check_in_room_or_world_readable """ def __init__(self, hs): - super(PaginationHandler, self).__init__(hs) - self.hs = hs + self.auth = hs.get_auth() self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -349,7 +321,7 @@ class PaginationHandler(MessageHandler): source_config = pagin_config.get_source_config("room") with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self._check_in_room_or_world_readable( + membership, member_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) -- cgit 1.5.1 From 5c88bb722f57af1c77f34d77152689425ab95eba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jul 2018 15:32:23 +0100 Subject: Move PaginationHandler to its own file --- synapse/handlers/message.py | 242 +------------------------------------ synapse/handlers/pagination.py | 265 +++++++++++++++++++++++++++++++++++++++++ synapse/server.py | 7 +- 3 files changed, 269 insertions(+), 245 deletions(-) create mode 100644 synapse/handlers/pagination.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c1489cd066..ba3c4642bc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from twisted.python.failure import Failure from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError @@ -32,49 +31,17 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master -from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.types import RoomAlias, UserID +from synapse.util.async import Limiter from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.stringutils import random_string -from synapse.visibility import filter_events_for_client from ._base import BaseHandler logger = logging.getLogger(__name__) -class PurgeStatus(object): - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - - Attributes: - status (int): Tracks whether this request has completed. One of - STATUS_{ACTIVE,COMPLETE,FAILED} - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - def __init__(self): - self.status = PurgeStatus.STATUS_ACTIVE - - def asdict(self): - return { - "status": PurgeStatus.STATUS_TEXT[self.status] - } - - class MessageHandler(object): """Contains some read only APIs to get state about a room """ @@ -189,211 +156,6 @@ class MessageHandler(object): }) -class PaginationHandler(object): - """Handles pagination and purge history requests. - - These are in the same handler due to the fact we need to block clients - paginating during a purge. - """ - - def __init__(self, hs): - self.hs = hs - self.auth = hs.get_auth() - self.store = hs.get_datastore() - self.clock = hs.get_clock() - - self.pagination_lock = ReadWriteLock() - self._purges_in_progress_by_room = set() - # map from purge id to PurgeStatus - self._purges_by_id = {} - - def start_purge_history(self, room_id, token, - delete_local_events=False): - """Start off a history purge on a room. - - Args: - room_id (str): The room to purge from - - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - str: unique ID for this purge transaction. - """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, - "History purge already in progress for %s" % (room_id, ), - ) - - purge_id = random_string(16) - - # we log the purge_id here so that it can be tied back to the - # request id in the log lines. - logger.info("[purge] starting purge_id %s", purge_id) - - self._purges_by_id[purge_id] = PurgeStatus() - run_in_background( - self._purge_history, - purge_id, room_id, token, delete_local_events, - ) - return purge_id - - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, - delete_local_events): - """Carry out a history purge on a room. - - Args: - purge_id (str): The id for this purge - room_id (str): The room to purge from - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - Deferred - """ - self._purges_in_progress_by_room.add(room_id) - try: - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, token, delete_local_events, - ) - logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE - except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge(): - del self._purges_by_id[purge_id] - self.hs.get_reactor().callLater(24 * 3600, clear_purge) - - def get_purge_status(self, purge_id): - """Get the current status of an active purge - - Args: - purge_id (str): purge_id returned by start_purge_history - - Returns: - PurgeStatus|None - """ - return self._purges_by_id.get(purge_id) - - @defer.inlineCallbacks - def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True, event_filter=None): - """Get messages in a room. - - Args: - requester (Requester): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - as_client_event (bool): True to get events in client-server format. - event_filter (Filter): Filter to apply to results or None - Returns: - dict: Pagination API results - """ - user_id = requester.user.to_string() - - if pagin_config.from_token: - room_token = pagin_config.from_token.room_key - else: - pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token_for_room( - room_id=room_id - ) - ) - room_token = pagin_config.from_token.room_key - - room_token = RoomStreamToken.parse(room_token) - - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) - - source_config = pagin_config.get_source_config("room") - - with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self.auth.check_in_room_or_world_readable( - room_id, user_id - ) - - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) - - events, next_key = yield self.store.paginate_room_events( - room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, - event_filter=event_filter, - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) - - if not events: - defer.returnValue({ - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - }) - - if event_filter: - events = event_filter.filter(events) - - events = yield filter_events_for_client( - self.store, - user_id, - events, - is_peeking=(member_event_id is None), - ) - - time_now = self.clock.time_msec() - - chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - - defer.returnValue(chunk) - - class EventCreationHandler(object): def __init__(self, hs): self.hs = hs diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py new file mode 100644 index 0000000000..b2849783ed --- /dev/null +++ b/synapse/handlers/pagination.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 - 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer +from twisted.python.failure import Failure + +from synapse.api.constants import Membership +from synapse.api.errors import SynapseError +from synapse.events.utils import serialize_event +from synapse.types import RoomStreamToken +from synapse.util.async import ReadWriteLock +from synapse.util.logcontext import run_in_background +from synapse.util.stringutils import random_string +from synapse.visibility import filter_events_for_client + +logger = logging.getLogger(__name__) + + +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + +class PaginationHandler(object): + """Handles pagination and purge history requests. + + These are in the same handler due to the fact we need to block clients + paginating during a purge. + """ + + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} + + def start_purge_history(self, room_id, token, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), + ) + + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, token, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, token, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, token, delete_local_events, + ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED + finally: + self._purges_in_progress_by_room.discard(room_id) + + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + self.hs.get_reactor().callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + + @defer.inlineCallbacks + def get_messages(self, requester, room_id=None, pagin_config=None, + as_client_event=True, event_filter=None): + """Get messages in a room. + + Args: + requester (Requester): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + as_client_event (bool): True to get events in client-server format. + event_filter (Filter): Filter to apply to results or None + Returns: + dict: Pagination API results + """ + user_id = requester.user.to_string() + + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: + pagin_config.from_token = ( + yield self.hs.get_event_sources().get_current_token_for_room( + room_id=room_id + ) + ) + room_token = pagin_config.from_token.room_key + + room_token = RoomStreamToken.parse(room_token) + + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self.auth.check_in_room_or_world_readable( + room_id, user_id + ) + + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo + ) + + events, next_key = yield self.store.paginate_room_events( + room_id=room_id, + from_key=source_config.from_key, + to_key=source_config.to_key, + direction=source_config.direction, + limit=source_config.limit, + event_filter=event_filter, + ) + + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) + + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + if event_filter: + events = event_filter.filter(events) + + events = yield filter_events_for_client( + self.store, + user_id, + events, + is_peeking=(member_event_id is None), + ) + + time_now = self.clock.time_msec() + + chunk = { + "chunk": [ + serialize_event(e, time_now, as_client_event) + for e in events + ], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) diff --git a/synapse/server.py b/synapse/server.py index a24ea158df..83eacccc29 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,11 +52,8 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler -from synapse.handlers.message import ( - EventCreationHandler, - MessageHandler, - PaginationHandler, -) +from synapse.handlers.message import EventCreationHandler, MessageHandler +from synapse.handlers.pagination import PaginationHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler -- cgit 1.5.1 From 3132b89f12f0386558045683ad198f090b0e2c90 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 21 Jul 2018 15:47:18 +1000 Subject: Make the rest of the .iterwhatever go away (#3562) --- changelog.d/3562.misc | 0 synapse/app/homeserver.py | 6 ++++-- synapse/app/synctl.py | 4 +++- synapse/events/snapshot.py | 4 +++- synapse/handlers/federation.py | 18 +++++++++--------- synapse/state.py | 6 +++--- synapse/visibility.py | 19 ++++++++++--------- tests/test_federation.py | 3 +-- 8 files changed, 33 insertions(+), 27 deletions(-) create mode 100644 changelog.d/3562.misc (limited to 'synapse/handlers') diff --git a/changelog.d/3562.misc b/changelog.d/3562.misc new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 14e6dca522..2ad1beb8d8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ import logging import os import sys +from six import iteritems + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -442,7 +444,7 @@ def run(hs): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.iteritems(): + for name, count in iteritems(daily_user_type_results): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -453,7 +455,7 @@ def run(hs): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in r30_results.iteritems(): + for name, count in iteritems(r30_results): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 68acc15a9a..d658f967ba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -25,6 +25,8 @@ import subprocess import sys import time +from six import iteritems + import yaml SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] @@ -173,7 +175,7 @@ def main(): os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) - for cache_name, factor in cache_factors.iteritems(): + for cache_name, factor in iteritems(cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb5946..f83a1581a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer @@ -159,7 +161,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 65f6041b10..a6d391c4e8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,8 +21,8 @@ import logging import sys import six -from six import iteritems -from six.moves import http_client +from six import iteritems, itervalues +from six.moves import http_client, zip from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -731,7 +731,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -748,7 +748,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -811,7 +811,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -827,15 +827,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -1515,7 +1515,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) diff --git a/synapse/state.py b/synapse/state.py index 15a593d41c..504caae2f7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ import hashlib import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems, iterkeys, itervalues from frozendict import frozendict @@ -647,7 +647,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_id in event_ids ) if event_map is not None: - needed_events -= set(event_map.iterkeys()) + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) @@ -668,7 +668,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(event_map.iterkeys()) + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/visibility.py b/synapse/visibility.py index 9b97ea2b83..ba0499a022 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,11 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import operator -import six +from six import iteritems, itervalues +from six.moves import map from twisted.internet import defer @@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, return event # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) + filtered_events = map(allowed, events) # remove the None entries filtered_events = filter(operator.truth, filtered_events) @@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.itervalues(): + for ev in itervalues(state): if ev.type != EventTypes.Member: continue try: @@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events): ) visibility_ids = set() - for sids in event_to_state_ids.itervalues(): + for sids in itervalues(event_to_state_ids): hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events): event_map = yield store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() + for e in itervalues(event_map) ) if all_open: @@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events): # state_key_to_event_id_set = { e - for key_to_eid in six.itervalues(event_to_state_ids) + for key_to_eid in itervalues(event_to_state_ids) for e in key_to_eid.items() } @@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events): event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() + for key, inner_e_id in iteritems(key_to_eid) if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.iteritems() + for e_id, key_to_eid in iteritems(event_to_state_ids) } defer.returnValue([ diff --git a/tests/test_federation.py b/tests/test_federation.py index 159a136971..f40ff29b52 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -137,7 +137,6 @@ class MessageAcceptTests(unittest.TestCase): ) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") - @unittest.DEBUG def test_cant_hide_past_history(self): """ If you send a message, you must be able to provide the direct @@ -178,7 +177,7 @@ class MessageAcceptTests(unittest.TestCase): for x, y in d.items() if x == ("m.room.member", "@us:test") ], - "auth_chain_ids": d.values(), + "auth_chain_ids": list(d.values()), } ) -- cgit 1.5.1 From e42510ba635b3e4d83215e4f5634ca51411996e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:00:22 +0100 Subject: Use new getters --- synapse/api/auth.py | 6 ++++-- synapse/handlers/_base.py | 3 ++- synapse/handlers/federation.py | 23 ++++++++++++++++------- synapse/handlers/message.py | 26 ++++++++++++++++---------- synapse/handlers/room_member.py | 9 ++++++--- synapse/push/bulk_push_rule_evaluator.py | 7 ++++--- synapse/storage/events.py | 2 +- synapse/storage/push_rule.py | 7 +++++-- synapse/storage/roommember.py | 7 +++++-- 9 files changed, 59 insertions(+), 31 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bc629832d9..535bdb449d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -65,8 +65,9 @@ class Auth(object): @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -544,7 +545,8 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): - auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids) + prev_state_ids = yield context.get_prev_state_ids(self.store) + auth_ids = yield self.compute_auth_events(builder, prev_state_ids) auth_events_entries = yield self.store.add_event_hashes( auth_ids diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index b6a8b3aa3b..704181d2d3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -112,8 +112,9 @@ class BaseHandler(object): guest_access = event.content.get("guest_access", "forbidden") if guest_access != "can_join": if context: + current_state_ids = yield context.get_current_state_ids(self.store) current_state = yield self.store.get_events( - list(context.current_state_ids.values()) + list(current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a6d391c4e8..98dd4a7fd1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -486,7 +486,10 @@ class FederationHandler(BaseHandler): # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.prev_state_ids.get( + + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_state_id = prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = list(context.prev_state_ids.values()) + prev_state_ids = yield context.get_prev_state_ids(self.store) + + state_ids = list(prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(context.prev_state_ids.values())) + state = yield self.store.get_events(list(prev_state_ids.values())) defer.returnValue({ "state": list(state.values()), @@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler): ) if not auth_events: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler): break if do_resolution: + prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids + event, prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain( auth_ids, include_given=True @@ -2222,7 +2229,8 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.prev_state_ids.get(key) + prev_state_ids = yield context.get_prev_state_ids(self.store) + original_invite_id = prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -2264,7 +2272,8 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + invite_event_id = prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index abc07ea87c..c4bcd9018b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -630,7 +630,8 @@ class EventCreationHandler(object): If so, returns the version of the event in context. Otherwise, returns None. """ - prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_event_id = prev_state_ids.get((event.type, event.state_key)) prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -752,8 +753,8 @@ class EventCreationHandler(object): event = builder.build() logger.debug( - "Created event %s with state: %s", - event.event_id, context.prev_state_ids, + "Created event %s", + event.event_id, ) defer.returnValue( @@ -884,9 +885,11 @@ class EventCreationHandler(object): e.sender == event.sender ) + current_state_ids = yield context.get_current_state_ids(self.store) + state_to_include_ids = [ e_id - for k, e_id in iteritems(context.current_state_ids) + for k, e_id in iteritems(current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -922,8 +925,9 @@ class EventCreationHandler(object): ) if event.type == EventTypes.Redaction: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -943,11 +947,13 @@ class EventCreationHandler(object): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.prev_state_ids: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) + if event.type == EventTypes.Create: + prev_state_ids = yield context.get_prev_state_ids(self.store) + if prev_state_ids: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 00f2e279bc..a832d91809 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -201,7 +201,9 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, target.to_string()), None ) @@ -496,9 +498,10 @@ class RoomMemberHandler(object): if prev_event is not None: return + prev_state_ids = yield context.get_prev_state_ids(self.store) if event.membership == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(context.prev_state_ids) + guest_can_join = yield self._can_guest_join(prev_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -517,7 +520,7 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, event.state_key), None ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index bb181d94ee..1d14d3639c 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def _get_power_levels_and_sender_level(self, event, context): - pl_event_id = context.prev_state_ids.get(POWER_KEY) + prev_state_ids = yield context.get_prev_state_ids(self.store) + pl_event_id = prev_state_ids.get(POWER_KEY) if pl_event_id: # fastpath: if there's a power level event, that's all we need, and # not having a power level event is an extreme edge case @@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object): auth_events = {POWER_KEY: pl_event} else: auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=False, + event, prev_state_ids, for_verification=False, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -304,7 +305,7 @@ class RulesForRoom(object): push_rules_delta_state_cache_metric.inc_hits() else: - current_state_ids = context.current_state_ids + current_state_ids = yield context.get_current_state_ids(self.store) push_rules_delta_state_cache_metric.inc_misses() push_rules_state_size_counter.inc(len(current_state_ids)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4ff0fdc4ab..bf4f3ee92a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -549,7 +549,7 @@ class EventsStore(EventsWorkerStore): if ctx.state_group in state_groups_map: continue - state_groups_map[ctx.state_group] = ctx.current_state_ids + state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index be655d287b..af564b1b4e 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, defer.returnValue(results) + @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): state_group = context.state_group if not state_group: @@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._bulk_get_push_rules_for_room( - event.room_id, state_group, context.current_state_ids, event=event + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._bulk_get_push_rules_for_room( + event.room_id, state_group, current_state_ids, event=event ) + defer.returnValue(result) @cachedInlineCallbacks(num_args=2, cache_context=True) def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed9..a27702a7a0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): defer.returnValue(user_who_share_room) + @defer.inlineCallbacks def get_joined_users_from_context(self, event, context): state_group = context.state_group if not state_group: @@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - event.room_id, state_group, context.current_state_ids, + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._get_joined_users_from_context( + event.room_id, state_group, current_state_ids, event=event, context=context, ) + defer.returnValue(result) def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group -- cgit 1.5.1 From 027bc01a1bc254fe08140c6e91a9fb945b08486f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:02:09 +0100 Subject: Add support for updating state --- synapse/events/snapshot.py | 19 +++++++++++++++++++ synapse/handlers/federation.py | 32 +++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9568638a1..b090751bf1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -228,6 +228,25 @@ class EventContext(object): else: self._prev_state_ids = self._current_state_ids + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids + + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 98dd4a7fd1..14654d59f1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1975,21 +1975,35 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + delta_ids = dict(context.delta_ids) + delta_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + delta_ids=delta_ids, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + delta_ids=delta_ids, ) @defer.inlineCallbacks -- cgit 1.5.1 From 0faa3223cdf996aa18376a7420a43061a6691638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 16:28:00 +0100 Subject: Fix missing attributes on workers. This was missed during the transition from attribute to getter for getting state from context. --- synapse/events/snapshot.py | 10 ++++++---- synapse/handlers/message.py | 5 +++-- synapse/replication/http/send_event.py | 7 +++++-- 3 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e31eceb921..a59064b416 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -110,7 +110,8 @@ class EventContext(object): return context - def serialize(self, event): + @defer.inlineCallbacks + def serialize(self, event, store): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -126,11 +127,12 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None - return { + defer.returnValue({ "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, @@ -140,7 +142,7 @@ class EventContext(object): "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None - } + }) @staticmethod def deserialize(store, input): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4bcd9018b..7571975c22 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -807,8 +807,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede54792..5227bc333d 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -34,12 +34,13 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(clock, client, host, port, requester, event, context, +def send_event_to_master(clock, store, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: clock (synapse.util.Clock) + store (DataStore) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context, host, port, event.event_id, ) + serialized_context = yield context.serialize(event, store) + payload = { "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], -- cgit 1.5.1 From 254fb430d1662c93c56c2abbd6984e07fb04c36b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 19:21:20 +0100 Subject: incorporate review --- synapse/handlers/sync.py | 67 +++++++++++++++++++----------------------------- synapse/storage/state.py | 20 ++++++--------- 2 files changed, 35 insertions(+), 52 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b597f94cf6..5689ad2f58 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -543,17 +543,6 @@ class SyncHandler(object): state_ids = current_state_ids - # track the membership state events as of the beginning of this - # timeline sequence, so they can be filtered out of the state - # if we are lazy loading members. - if lazy_load_members: - member_state_ids = { - t: state_ids[t] - for t in state_ids if t[0] == EventTypes.Member - } - else: - member_state_ids = {} - timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -562,9 +551,9 @@ class SyncHandler(object): state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, - timeline_start_members=member_state_ids, previous={}, current=current_state_ids, + lazy_load_members=lazy_load_members, ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( @@ -582,37 +571,27 @@ class SyncHandler(object): filtered_types=filtered_types, ) - # track the membership state events as of the beginning of this - # timeline sequence, so they can be filtered out of the state - # if we are lazy loading members. - if lazy_load_members: - # TODO: optionally filter out redundant membership events at this - # point, to stop repeatedly sending members in every /sync as if - # the client isn't tracking them. - # When implement, this should filter using event_ids (not mxids). - # In practice, limited syncs are - # relatively rare so it's not a total disaster to send redundant - # members down at this point. Redundant members are ones which - # repeatedly get sent down /sync because we don't know if the client - # is caching them or not. - member_state_ids = { - t: state_at_timeline_start[t] - for t in state_at_timeline_start if t[0] == EventTypes.Member - } - else: - member_state_ids = {} - timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() } + # TODO: optionally filter out redundant membership events at this + # point, to stop repeatedly sending members in every /sync as if + # the client isn't tracking them. + # When implemented, this should filter using event_ids (not mxids). + # In practice, limited syncs are + # relatively rare so it's not a total disaster to send redundant + # members down at this point. Redundant members are ones which + # repeatedly get sent down /sync because we don't know if the client + # is caching them or not. + state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, - timeline_start_members=member_state_ids, previous=state_at_previous_sync, current=current_state_ids, + lazy_load_members=lazy_load_members, ) else: state_ids = {} @@ -1536,16 +1515,14 @@ def _action_has_highlight(actions): return False -def _calculate_state(timeline_contains, timeline_start, timeline_start_members, - previous, current): +def _calculate_state( + timeline_contains, timeline_start, previous, current, lazy_load_members, +): """Works out what state to include in a sync response. Args: timeline_contains (dict): state in the timeline timeline_start (dict): state at the start of the timeline - timeline_start_members (dict): state at the start of the timeline - for room members who participate in this chunk of timeline. - Should always be a subset of timeline_start. previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline @@ -1565,11 +1542,21 @@ def _calculate_state(timeline_contains, timeline_start, timeline_start_members, c_ids = set(e for e in current.values()) ts_ids = set(e for e in timeline_start.values()) - tsm_ids = set(e for e in timeline_start_members.values()) p_ids = set(e for e in previous.values()) tc_ids = set(e for e in timeline_contains.values()) - state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | tsm_ids + # track the membership events in the state as of the start of the timeline + # so we can add them back in to the state if we're lazyloading. We don't + # add them into state if they're already contained in the timeline. + if lazy_load_members: + ll_ids = set( + e for t, e in timeline_start.iteritems() + if t[0] == EventTypes.Member and e not in tc_ids + ) + else: + ll_ids = set() + + state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | ll_ids return { event_id_to_key[e]: e for e in state_ids diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f09be7172d..40ca8bd2a2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -191,10 +191,10 @@ class StateGroupWorkerStore(SQLBaseStore): Args: groups(list[int]): list of state group IDs to query - types(list[str|None, str|None])|None: List of 2-tuples of the form + types (Iterable[str, str|None]|None): list of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the `type`. If None, all types are returned. - filtered_types(list[str]|None): Only apply filtering via `types` to this + filtered_types(Iterable[str]|None): Only apply filtering via `types` to this list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. @@ -207,19 +207,17 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, filtered_types + self._get_state_groups_from_groups_txn, chunk, types, filtered_types, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, filtered_types=None + self, txn, groups, types=None, filtered_types=None, ): results = {group: {} for group in groups} - include_other_types = False if filtered_types is None else True - if types is not None: types = list(set(types)) # deduplicate types list @@ -269,7 +267,7 @@ class StateGroupWorkerStore(SQLBaseStore): for etype, state_key in types ] - if include_other_types: + if filtered_types is not None: # XXX: check whether this slows postgres down like a list of # ORs does too? unique_types = set(filtered_types) @@ -308,7 +306,7 @@ class StateGroupWorkerStore(SQLBaseStore): where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) - if include_other_types: + if filtered_types is not None: unique_types = set(filtered_types) where_clauses.append( "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" @@ -538,8 +536,6 @@ class StateGroupWorkerStore(SQLBaseStore): # tracks which of the requested types are missing from our cache missing_types = set() - include_other_types = False if filtered_types is None else True - for typ, state_key in types: key = (typ, state_key) @@ -562,7 +558,7 @@ class StateGroupWorkerStore(SQLBaseStore): def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return include_other_types and typ not in filtered_types + return filtered_types is not None and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: @@ -598,7 +594,7 @@ class StateGroupWorkerStore(SQLBaseStore): Args: groups (iterable[int]): list of state groups for which we want to get the state. - types (None|iterable[(None, None|str)]): + types (None|iterable[(str, None|str)]): indicates the state type/keys required. If None, the whole state is fetched and returned. -- cgit 1.5.1 From c1f80effbe17b1572161cc50838e60b495fb45a4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 22:06:50 +0100 Subject: Handle delta_ids being None in _update_context_for_auth_events it's easier to create the new state group as a delta from the existing one. (There's an outside chance this will help with https://github.com/matrix-org/synapse/issues/3364) --- synapse/events/snapshot.py | 3 ++- synapse/handlers/federation.py | 13 ++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 189212b0fa..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -249,7 +249,7 @@ class EventContext(object): @defer.inlineCallbacks def update_state(self, state_group, prev_state_ids, current_state_ids, - delta_ids): + prev_group, delta_ids): """Replace the state in the context """ @@ -260,6 +260,7 @@ class EventContext(object): self.state_group = state_group self._prev_state_ids = prev_state_ids + self.prev_group = prev_group self._current_state_ids = current_state_ids self.delta_ids = delta_ids diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14654d59f1..145c1a21d4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler): current_state_ids.update(state_updates) - if context.delta_ids is not None: - delta_ids = dict(context.delta_ids) - delta_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = dict(prev_state_ids) @@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) }) + # create a new state group as a delta from the existing one. + prev_group = context.state_group state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, current_state_ids=current_state_ids, ) @@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler): state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks -- cgit 1.5.1 From 8dff6e0322718ec9c446465c1e10ab331a417b8a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:37:17 +0100 Subject: Logcontext fixes Fix some random logcontext leaks. --- synapse/handlers/initial_sync.py | 28 +++++++++++++++------------- synapse/storage/events.py | 5 +++-- synapse/storage/pusher.py | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index fb11716eb8..50b13d8820 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -387,19 +387,21 @@ class InitialSyncHandler(BaseHandler): receipts = [] defer.returnValue(receipts) - presence, receipts, (messages, token) = yield defer.gatherResults( - [ - run_in_background(get_presence), - run_in_background(get_receipts), - run_in_background( - self.store.get_recent_events_for_room, - room_id, - limit=limit, - end_token=now_token.room_key, - ) - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + presence, receipts, (messages, token) = yield make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, + room_id, + limit=limit, + end_token=now_token.room_key, + ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError), + ) messages = yield filter_events_for_client( self.store, user_id, messages, is_peeking=is_peeking, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c2910094d0..c06dbb3768 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -147,7 +147,8 @@ class _EventPeristenceQueue(object): # callbacks on the deferred. try: ret = yield per_item_callback(item) - item.deferred.callback(ret) + with PreserveLoggingContext(): + item.deferred.callback(ret) except Exception: item.deferred.errback() finally: diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index cc273a57b2..8443bd4c1b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore): ) if newly_inserted: - self.runInteraction( + yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,) -- cgit 1.5.1 From cf2d15c6a953d42207fb2c8fe5dc57ee7fdae7ce Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 24 Jul 2018 00:57:48 +0100 Subject: another couple of logcontext leaks --- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/initial_sync.py | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ec9fe01a5a..ee41aed69e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,7 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -106,7 +107,9 @@ class ApplicationServicesHandler(object): yield self._check_user_exists(event.state_key) if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) + def start_scheduler(): + return self.scheduler.start().addErrback(log_failure) + run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True # Fork off pushes to these services diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 50b13d8820..40e7580a61 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler): try: if event.membership == Membership.JOIN: room_end_token = now_token.room_key - deferred_room_state = self.state_handler.get_current_state( - event.room_id + deferred_room_state = run_in_background( + self.state_handler.get_current_state, + event.room_id, ) elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) - deferred_room_state = self.store.get_state_for_events( - [event.event_id], None + deferred_room_state = run_in_background( + self.store.get_state_for_events, + [event.event_id], None, ) deferred_room_state.addCallback( lambda states: states[event.event_id] -- cgit 1.5.1 From cd241d6bda01a761fbe1ca29727dacd918fb8975 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 12:39:40 +0100 Subject: incorporate more review --- synapse/handlers/sync.py | 12 +++++++++--- synapse/storage/state.py | 36 +++++++++--------------------------- tests/storage/test_state.py | 9 +++++++++ 3 files changed, 27 insertions(+), 30 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5689ad2f58..e5a2329d73 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1526,6 +1526,9 @@ def _calculate_state( previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline + lazy_load_members (bool): whether to return members from timeline_start + or not. assumes that timeline_start has already been filtered to + include only the members the client needs to know about. Returns: dict @@ -1545,9 +1548,12 @@ def _calculate_state( p_ids = set(e for e in previous.values()) tc_ids = set(e for e in timeline_contains.values()) - # track the membership events in the state as of the start of the timeline - # so we can add them back in to the state if we're lazyloading. We don't - # add them into state if they're already contained in the timeline. + # If we are lazyloading room members, we explicitly add the membership events + # for the senders in the timeline into the state block returned by /sync, + # as we may not have sent them to the client before. We find these membership + # events by filtering them out of timeline_start, which has already been filtered + # to only include membership events for the senders in the timeline. + if lazy_load_members: ll_ids = set( e for t, e in timeline_start.iteritems() diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f99d3871e4..1413a6f910 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -185,7 +185,7 @@ class StateGroupWorkerStore(SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types, filtered_types=None): + def _get_state_groups_from_groups(self, groups, types): """Returns the state groups for a given set of groups, filtering on types of state events. @@ -194,9 +194,6 @@ class StateGroupWorkerStore(SQLBaseStore): types (Iterable[str, str|None]|None): list of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the `type`. If None, all types are returned. - filtered_types(Iterable[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. Returns: dictionary state_group -> (dict of (type, state_key) -> event id) @@ -207,14 +204,14 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, filtered_types, + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, filtered_types=None, + self, txn, groups, types=None, ): results = {group: {} for group in groups} @@ -266,17 +263,6 @@ class StateGroupWorkerStore(SQLBaseStore): ) for etype, state_key in types ] - - if filtered_types is not None: - # XXX: check whether this slows postgres down like a list of - # ORs does too? - unique_types = set(filtered_types) - clause_to_args.append( - ( - "AND type <> ? " * len(unique_types), - list(unique_types) - ) - ) else: # If types is None we fetch all the state, and so just use an # empty where clause with no extra args. @@ -306,13 +292,6 @@ class StateGroupWorkerStore(SQLBaseStore): where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) - if filtered_types is not None: - unique_types = set(filtered_types) - where_clauses.append( - "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" - ) - where_args.extend(list(unique_types)) - where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" @@ -643,13 +622,13 @@ class StateGroupWorkerStore(SQLBaseStore): # cache. Hence, if we are doing a wildcard lookup, populate the # cache fully so that we can do an efficient lookup next time. - if types and any(k is None for (t, k) in types): + if filtered_types or (types and any(k is None for (t, k) in types)): types_to_fetch = None else: types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, filtered_types + missing_groups, types_to_fetch ) for group, group_state_dict in iteritems(group_to_state_dict): @@ -659,7 +638,10 @@ class StateGroupWorkerStore(SQLBaseStore): if types: for k, v in iteritems(group_state_dict): (typ, _) = k - if k in types or (typ, None) in types: + if ( + (k in types or (typ, None) in types) or + (filtered_types and typ not in filtered_types) + ): state_dict[k] = v else: state_dict.update(group_state_dict) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 8924ba9f7f..b2f314e9db 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -158,3 +158,12 @@ class StateStoreTestCase(tests.unittest.TestCase): (e2.type, e2.state_key): e2, (e3.type, e3.state_key): e3, }, state) + + state = yield self.store.get_state_for_event( + e5.event_id, [], filtered_types=[EventTypes.Member], + ) + + self.assertStateMapEqual({ + (e1.type, e1.state_key): e1, + (e2.type, e2.state_key): e2, + }, state) -- cgit 1.5.1 From eb1d911ab743e85154f7c4b2db8a954d152020dc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 13:40:49 +0100 Subject: rather than adding ll_ids, remove them from p_ids --- synapse/handlers/sync.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e5a2329d73..1422843af8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1553,16 +1553,17 @@ def _calculate_state( # as we may not have sent them to the client before. We find these membership # events by filtering them out of timeline_start, which has already been filtered # to only include membership events for the senders in the timeline. + # In practice, we can do this by removing them from the p_ids list. + # see https://github.com/matrix-org/synapse/pull/2970 + # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 if lazy_load_members: - ll_ids = set( + p_ids.difference_update( e for t, e in timeline_start.iteritems() - if t[0] == EventTypes.Member and e not in tc_ids + if t[0] == EventTypes.Member ) - else: - ll_ids = set() - state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | ll_ids + state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids return { event_id_to_key[e]: e for e in state_ids -- cgit 1.5.1 From 1a01a5b964d3ea373355684a91b9f7fd95726fbc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 14:03:15 +0100 Subject: clarify comment on p_ids --- synapse/handlers/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1422843af8..4ced3144c8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1553,7 +1553,8 @@ def _calculate_state( # as we may not have sent them to the client before. We find these membership # events by filtering them out of timeline_start, which has already been filtered # to only include membership events for the senders in the timeline. - # In practice, we can do this by removing them from the p_ids list. + # In practice, we can do this by removing them from the p_ids list, + # which is the list of relevant state we know we have already sent to the client. # see https://github.com/matrix-org/synapse/pull/2970 # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 -- cgit 1.5.1 From 8b8c4f34a336376610bf353f6aa5d71c5ef69980 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 16:46:30 +0100 Subject: Replace usage of get_current_toke with StreamToken.START This allows us to handle /context/ requests on the client_reader worker without having to pull in all the various stream handlers (e.g. precence, typing, pushers etc). The only thing the token gets used for is pagination, and that ignores everything but the room portion of the token. --- synapse/app/client_reader.py | 2 ++ synapse/handlers/room.py | 12 +++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 398bb36602..e2c91123db 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.directory import DirectoryStore @@ -58,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader") class ClientReaderSlavedStore( + SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, RoomStore, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6150b7e226..003b848c00 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,7 +24,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, StoreError, SynapseError -from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID +from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.visibility import filter_events_for_client @@ -418,8 +418,6 @@ class RoomContextHandler(object): before_limit = math.floor(limit / 2.) after_limit = limit - before_limit - now_token = yield self.hs.get_event_sources().get_current_token() - users = yield self.store.get_users_in_room(room_id) is_peeking = user.to_string() not in users @@ -462,11 +460,15 @@ class RoomContextHandler(object): ) results["state"] = list(state[last_event_id].values()) - results["start"] = now_token.copy_and_replace( + # We use a dummy token here as we only care about the room portion of + # the token, which we replace. + token = StreamToken.START + + results["start"] = token.copy_and_replace( "room_key", results["start"] ).to_string() - results["end"] = now_token.copy_and_replace( + results["end"] = token.copy_and_replace( "room_key", results["end"] ).to_string() -- cgit 1.5.1 From 55acd6856cd86feb34edcbb5d9ce30e55a04e27f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 10:34:48 +0100 Subject: Fix updating of cached remote profiles _update_remote_profile_cache was missing its `defer.inlineCallbacks`, so when it was called, would just return a generator object, without actually running any of the method body. --- synapse/handlers/profile.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 859f6d2b2e..43692b83a8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.errors import AuthError, CodeMessageException, SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler): if hs.config.worker_app is None: self.clock.looping_call( - self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) @defer.inlineCallbacks @@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + def _start_update_remote_profile_cache(self): + run_as_background_process( + "Update remote profile", self._update_remote_profile_cache, + ) + + @defer.inlineCallbacks def _update_remote_profile_cache(self): """Called periodically to check profiles of remote users we haven't checked in a while. -- cgit 1.5.1 From d8e65ed7e111243c08c0b87c9a49e7537c355074 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 25 Jul 2018 15:44:41 -0600 Subject: Fix a minor documentation typo in on_make_leave --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 145c1a21d4..49068c06d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1279,7 +1279,7 @@ class FederationHandler(BaseHandler): @log_function def on_make_leave_request(self, room_id, user_id): """ We've received a /make_leave/ request, so we create a partial - join event for the room and return that. We do *not* persist or + leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ builder = self.event_builder_factory.new({ -- cgit 1.5.1 From 03751a64203b169cbf33b636b6d940ca6d414c31 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 26 Jul 2018 11:44:26 +0100 Subject: Fix some looping_call calls which were broken in #3604 It turns out that looping_call does check the deferred returned by its callback, and (at least in the case of client_ips), we were relying on this, and I broke it in #3604. Update run_as_background_process to return the deferred, and make sure we return it to clock.looping_call. --- changelog.d/3610.feature | 1 + synapse/app/homeserver.py | 4 ++-- synapse/groups/attestations.py | 2 +- synapse/handlers/profile.py | 2 +- synapse/metrics/background_process_metrics.py | 10 ++++++++-- synapse/rest/media/v1/media_repository.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/storage/client_ips.py | 2 +- synapse/storage/devices.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/event_push_actions.py | 4 ++-- synapse/storage/transactions.py | 4 +++- synapse/util/caches/expiringcache.py | 2 +- 13 files changed, 24 insertions(+), 15 deletions(-) create mode 100644 changelog.d/3610.feature (limited to 'synapse/handlers') diff --git a/changelog.d/3610.feature b/changelog.d/3610.feature new file mode 100644 index 0000000000..77a294cb9f --- /dev/null +++ b/changelog.d/3610.feature @@ -0,0 +1 @@ +Add metrics to track resource usage by background processes diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b7e7718290..57b815d777 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -429,7 +429,7 @@ def run(hs): stats_process = [] def start_phone_stats_home(): - run_as_background_process("phone_stats_home", phone_stats_home) + return run_as_background_process("phone_stats_home", phone_stats_home) @defer.inlineCallbacks def phone_stats_home(): @@ -502,7 +502,7 @@ def run(hs): ) def generate_user_daily_visit_stats(): - run_as_background_process( + return run_as_background_process( "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 4216af0a27..b04f4234ca 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -153,7 +153,7 @@ class GroupAttestionRenewer(object): defer.returnValue({}) def _start_renew_attestations(self): - run_as_background_process("renew_attestations", self._renew_attestations) + return run_as_background_process("renew_attestations", self._renew_attestations) @defer.inlineCallbacks def _renew_attestations(self): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 43692b83a8..cb5c6d587e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler): ) def _start_update_remote_profile_cache(self): - run_as_background_process( + return run_as_background_process( "Update remote profile", self._update_remote_profile_cache, ) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 9d820e44a6..ce678d5f75 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -151,13 +151,19 @@ def run_as_background_process(desc, func, *args, **kwargs): This should be used to wrap processes which are fired off to run in the background, instead of being associated with a particular request. + It returns a Deferred which completes when the function completes, but it doesn't + follow the synapse logcontext rules, which makes it appropriate for passing to + clock.looping_call and friends (or for firing-and-forgetting in the middle of a + normal synapse inlineCallbacks function). + Args: desc (str): a description for this background process type func: a function, which may return a Deferred args: positional args for func kwargs: keyword args for func - Returns: None + Returns: Deferred which returns the result of func, but note that it does not + follow the synapse logcontext rules. """ @defer.inlineCallbacks def run(): @@ -176,4 +182,4 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_processes[desc].remove(proc) with PreserveLoggingContext(): - run() + return run() diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 5b13378caa..174ad20123 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -106,7 +106,7 @@ class MediaRepository(object): ) def _start_update_recently_accessed(self): - run_as_background_process( + return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed, ) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 4efd5339a4..27aa0def2f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -373,7 +373,7 @@ class PreviewUrlResource(Resource): }) def _start_expire_url_cache_data(self): - run_as_background_process( + return run_as_background_process( "expire_url_cache_data", self._expire_url_cache_data, ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 77ae10da3d..b8cefd43d6 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): to_update, ) - run_as_background_process( + return run_as_background_process( "update_client_ips", update, ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 52dccb1507..c0943ecf91 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - run_as_background_process( + return run_as_background_process( "prune_old_outbound_device_pokes", self.runInteraction, "_prune_old_outbound_device_pokes", diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 65f2d19e20..f269ec6fb3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -549,7 +549,7 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - run_as_background_process( + return run_as_background_process( "delete_old_forward_extrem_cache", self.runInteraction, "_delete_old_forward_extrem_cache", diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4f44b0ad47..6840320641 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) def _find_stream_orderings_for_times(self): - run_as_background_process( + return run_as_background_process( "event_push_action_stream_orderings", self.runInteraction, "_find_stream_orderings_for_times", @@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): """, (room_id, user_id, stream_ordering)) def _start_rotate_notifs(self): - run_as_background_process("rotate_notifs", self._rotate_notifs) + return run_as_background_process("rotate_notifs", self._rotate_notifs) @defer.inlineCallbacks def _rotate_notifs(self): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b4b479d94c..428e7fa36e 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore): return self.cursor_to_dict(txn) def _start_cleanup_transactions(self): - run_as_background_process("cleanup_transactions", self._cleanup_transactions) + return run_as_background_process( + "cleanup_transactions", self._cleanup_transactions, + ) def _cleanup_transactions(self): now = self._clock.time_msec() diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 465adc54a8..ce85b2ae11 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -64,7 +64,7 @@ class ExpiringCache(object): return def f(): - run_as_background_process( + return run_as_background_process( "prune_cache_%s" % self._cache_name, self._prune_cache, ) -- cgit 1.5.1 From a75231b507e025eaaa4f06d8932c04fa4e942d48 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 26 Jul 2018 22:51:30 +0100 Subject: Deduplicate redundant lazy-loaded members (#3331) * attempt at deduplicating lazy-loaded members as per the proposal; we can deduplicate redundant lazy-loaded members which are sent in the same sync sequence. we do this heuristically rather than requiring the client to somehow tell us which members it has chosen to cache, by instead caching the last N members sent to a client, and not sending them again. For now we hardcode N to 100. Each cache for a given (user,device) tuple is in turn cached for up to X minutes (to avoid the caches building up). For now we hardcode X to 30. * add include_redundant_members filter option & make it work * remove stale todo * add tests for _get_some_state_from_cache * incorporate review --- changelog.d/3331.feature | 1 + synapse/api/filtering.py | 9 +++++ synapse/handlers/sync.py | 87 ++++++++++++++++++++++++++++++++++-------------- 3 files changed, 72 insertions(+), 25 deletions(-) create mode 100644 changelog.d/3331.feature (limited to 'synapse/handlers') diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature new file mode 100644 index 0000000000..e574b9bcc3 --- /dev/null +++ b/changelog.d/3331.feature @@ -0,0 +1 @@ +add support for the include_redundant_members filter param as per MSC1227 diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 7e767b9bf5..186831e118 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = { "lazy_load_members": { "type": "boolean" }, + "include_redundant_members": { + "type": "boolean" + }, } } @@ -267,6 +270,9 @@ class FilterCollection(object): def lazy_load_members(self): return self._room_state_filter.lazy_load_members() + def include_redundant_members(self): + return self._room_state_filter.include_redundant_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -426,6 +432,9 @@ class Filter(object): def lazy_load_members(self): return self.filter_json.get("lazy_load_members", False) + def include_redundant_members(self): + return self.filter_json.get("include_redundant_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4ced3144c8..dff1f67dcb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.types import RoomStreamToken from synapse.util.async import concurrently_execute +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func @@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -182,6 +192,12 @@ class SyncHandler(object): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -505,9 +521,13 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): types = None - lazy_load_members = sync_config.filter_collection.lazy_load_members() filtered_types = None + lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) + if lazy_load_members: # We only request state for the members needed to display the # timeline: @@ -523,6 +543,11 @@ class SyncHandler(object): # only apply the filtering to room members filtered_types = [EventTypes.Member] + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( @@ -543,11 +568,6 @@ class SyncHandler(object): state_ids = current_state_ids - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, @@ -571,21 +591,6 @@ class SyncHandler(object): filtered_types=filtered_types, ) - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - - # TODO: optionally filter out redundant membership events at this - # point, to stop repeatedly sending members in every /sync as if - # the client isn't tracking them. - # When implemented, this should filter using event_ids (not mxids). - # In practice, limited syncs are - # relatively rare so it's not a total disaster to send redundant - # members down at this point. Redundant members are ones which - # repeatedly get sent down /sync because we don't know if the client - # is caching them or not. - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, @@ -596,16 +601,48 @@ class SyncHandler(object): else: state_ids = {} if lazy_load_members: - # TODO: filter out redundant members based on their mxids (not their - # event_ids) at this point. We know we can do it based on mxid as this - # is an non-gappy incremental sync. - if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, filtered_types=filtered_types, ) + if lazy_load_members and not include_redundant_members: + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: event_id + for t, event_id in state_ids.iteritems() + if cache.get(t[1]) != event_id + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(t[1], event_id) + state = {} if state_ids: state = yield self.store.get_events(list(state_ids.values())) -- cgit 1.5.1