From bed10f9880068306be3fcdd15a51b1712c6159f2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 Aug 2016 14:54:30 +0100 Subject: Use state handler instead of get_users_in_room/get_joined_hosts --- synapse/storage/events.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 57e5005285..5cbe8c5978 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -393,7 +393,6 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) # Add an entry to the current_state_resets table to record the point # where we clobbered the current state -- cgit 1.5.1 From 5dc2a702cf2415a55bab8061053c0b47dc21dc93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 16:54:40 +0100 Subject: Make _state_groups_id_gen a normal IdGenerator --- synapse/storage/__init__.py | 2 +- synapse/storage/events.py | 83 +++++++++++++++++++++------------------------ synapse/storage/state.py | 3 -- 3 files changed, 40 insertions(+), 48 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4f4c723c5b..6c32773f25 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -115,7 +115,7 @@ class DataStore(RoomMemberStore, RoomStore, ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") - self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") + self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5cbe8c5978..bc1bc97e19 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -271,39 +271,35 @@ class EventsStore(SQLBaseStore): len(events_and_contexts) ) - state_group_id_manager = self._state_groups_id_gen.get_next_mult( - len(events_and_contexts) - ) with stream_ordering_manager as stream_orderings: - with state_group_id_manager as state_group_ids: - for (event, context), stream, state_group_id in zip( - events_and_contexts, stream_orderings, state_group_ids - ): - event.internal_metadata.stream_ordering = stream - # Assign a state group_id in case a new id is needed for - # this context. In theory we only need to assign this - # for contexts that have current_state and aren't outliers - # but that make the code more complicated. Assigning an ID - # per event only causes the state_group_ids to grow as fast - # as the stream_ordering so in practise shouldn't be a problem. - context.new_state_group_id = state_group_id - - chunks = [ - events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) - ] + for (event, context), stream, in zip( + events_and_contexts, stream_orderings + ): + event.internal_metadata.stream_ordering = stream + # Assign a state group_id in case a new id is needed for + # this context. In theory we only need to assign this + # for contexts that have current_state and aren't outliers + # but that make the code more complicated. Assigning an ID + # per event only causes the state_group_ids to grow as fast + # as the stream_ordering so in practise shouldn't be a problem. + context.new_state_group_id = self._state_groups_id_gen.get_next() + + chunks = [ + events_and_contexts[x:x + 100] + for x in xrange(0, len(events_and_contexts), 100) + ] - for chunk in chunks: - # We can't easily parallelize these since different chunks - # might contain the same event. :( - yield self.runInteraction( - "persist_events", - self._persist_events_txn, - events_and_contexts=chunk, - backfilled=backfilled, - delete_existing=delete_existing, - ) - persist_event_counter.inc_by(len(chunk)) + for chunk in chunks: + # We can't easily parallelize these since different chunks + # might contain the same event. :( + yield self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + delete_existing=delete_existing, + ) + persist_event_counter.inc_by(len(chunk)) @_retry_on_integrity_error @defer.inlineCallbacks @@ -312,19 +308,18 @@ class EventsStore(SQLBaseStore): delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: - with self._state_groups_id_gen.get_next() as state_group_id: - event.internal_metadata.stream_ordering = stream_ordering - context.new_state_group_id = state_group_id - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - current_state=current_state, - backfilled=backfilled, - delete_existing=delete_existing, - ) - persist_event_counter.inc() + event.internal_metadata.stream_ordering = stream_ordering + context.new_state_group_id = self._state_groups_id_gen.get_next() + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + current_state=current_state, + backfilled=backfilled, + delete_existing=delete_existing, + ) + persist_event_counter.inc() except _RollbackButIsFineException: pass diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b1d461fef5..0442353287 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -526,6 +526,3 @@ class StateStore(SQLBaseStore): return self.runInteraction( "get_all_new_state_groups", get_all_new_state_groups_txn ) - - def get_state_stream_token(self): - return self._state_groups_id_gen.get_current_token() -- cgit 1.5.1 From 1bb8ec296de4f6bf37b0b31f28bc0d7285f96ba0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 10:09:46 +0100 Subject: Generate state group ids in state layer --- synapse/state.py | 9 ++++++--- synapse/storage/events.py | 10 +--------- synapse/storage/state.py | 24 +++++++++++++++++------- 3 files changed, 24 insertions(+), 19 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/state.py b/synapse/state.py index daec983dc9..147416fd81 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -160,14 +160,14 @@ class StateHandler(object): else: context.current_state_ids = {} context.prev_state_events = [] - context.state_group = None + context.state_group = self.store.get_next_state_group() defer.returnValue(context) if old_state: context.current_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } - context.state_group = None + context.state_group = self.store.get_next_state_group() if event.is_state(): key = (event.type, event.state_key) @@ -193,7 +193,10 @@ class StateHandler(object): group, curr_state = ret context.current_state_ids = curr_state - context.state_group = group if not event.is_state() else None + if event.is_state() or group is None: + context.state_group = self.store.get_next_state_group() + else: + context.state_group = group if event.is_state(): key = (event.type, event.state_key) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bc1bc97e19..1a7d4c5199 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -276,13 +276,6 @@ class EventsStore(SQLBaseStore): events_and_contexts, stream_orderings ): event.internal_metadata.stream_ordering = stream - # Assign a state group_id in case a new id is needed for - # this context. In theory we only need to assign this - # for contexts that have current_state and aren't outliers - # but that make the code more complicated. Assigning an ID - # per event only causes the state_group_ids to grow as fast - # as the stream_ordering so in practise shouldn't be a problem. - context.new_state_group_id = self._state_groups_id_gen.get_next() chunks = [ events_and_contexts[x:x + 100] @@ -309,7 +302,6 @@ class EventsStore(SQLBaseStore): try: with self._stream_id_gen.get_next() as stream_ordering: event.internal_metadata.stream_ordering = stream_ordering - context.new_state_group_id = self._state_groups_id_gen.get_next() yield self.runInteraction( "persist_event", self._persist_event_txn, @@ -523,7 +515,7 @@ class EventsStore(SQLBaseStore): # Add an entry to the ex_outlier_stream table to replicate the # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering - state_group_id = context.state_group or context.new_state_group_id + state_group_id = context.state_group self._simple_insert_txn( txn, table="ex_outlier_stream", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0442353287..56bfdc0b55 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,6 +83,14 @@ class StateStore(SQLBaseStore): for group, event_id_map in group_to_ids.items() }) + def _have_persisted_state_group_txn(self, txn, state_group): + txn.execute( + "SELECT count(*) FROM state_groups_state WHERE state_group = ?", + (state_group,) + ) + row = txn.fetchone() + return row and row[0] + def _store_mult_state_groups_txn(self, txn, events_and_contexts): state_groups = {} for event, context in events_and_contexts: @@ -92,8 +100,10 @@ class StateStore(SQLBaseStore): if context.current_state_ids is None: continue - if context.state_group is not None: - state_groups[event.event_id] = context.state_group + state_groups[event.event_id] = context.state_group + + if self._have_persisted_state_group_txn(txn, context.state_group): + logger.info("Already persisted state_group: %r", context.state_group) continue state_event_ids = dict(context.current_state_ids) @@ -101,13 +111,11 @@ class StateStore(SQLBaseStore): if event.is_state(): state_event_ids[(event.type, event.state_key)] = event.event_id - state_group = context.new_state_group_id - self._simple_insert_txn( txn, table="state_groups", values={ - "id": state_group, + "id": context.state_group, "room_id": event.room_id, "event_id": event.event_id, }, @@ -118,7 +126,7 @@ class StateStore(SQLBaseStore): table="state_groups_state", values=[ { - "state_group": state_group, + "state_group": context.state_group, "room_id": event.room_id, "type": key[0], "state_key": key[1], @@ -127,7 +135,6 @@ class StateStore(SQLBaseStore): for key, state_id in state_event_ids.items() ], ) - state_groups[event.event_id] = state_group self._simple_insert_many_txn( txn, @@ -526,3 +533,6 @@ class StateStore(SQLBaseStore): return self.runInteraction( "get_all_new_state_groups", get_all_new_state_groups_txn ) + + def get_next_state_group(self): + return self._state_groups_id_gen.get_next() -- cgit 1.5.1