From b928909795a8df5cf888f5a22bd99ec82a6601ef Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 15:36:49 +0100 Subject: Fix incremental processor when there are no deltas. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/stats.py') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f44adfc07b..6341c3244e 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -98,15 +98,16 @@ class StatsHandler(StateDeltasHandler): self.pos["state_delta_stream_id"] ) - logger.debug("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) + if deltas: + logger.debug("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) - self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] - yield self.store.update_stats_positions(self.pos) + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) - event_processing_positions.labels("stats").set( - self.pos["state_delta_stream_id"] - ) + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) # Then count deltas for total_events and total_event_bytes. with Measure(self.clock, "stats_total_events_and_bytes"): -- cgit 1.5.1 From 6f5e543901b19cd24d0eb104b0e40f4fda324fc5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 16:26:40 +0100 Subject: Various fixes Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 9 +++------ synapse/storage/stats.py | 5 +++-- 2 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/handlers/stats.py') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 6341c3244e..572da0a344 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -159,12 +159,9 @@ class StatsHandler(StateDeltasHandler): if event: event_content = event.content or {} - # We use stream_pos here rather than fetch by event_id as event_id - # may be None - stream_timestamp = yield self.store.get_received_ts_by_stream_pos( - stream_pos - ) - stream_timestamp = int(stream_timestamp) + # We can't afford for this time to stray into the past, so we count + # it as now. + stream_timestamp = int(self.clock.time_msec()) # All the values in this dict are deltas (RELATIVE changes) room_stats_delta = {} diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 7bf729c9d8..c022f620fc 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -948,10 +948,11 @@ class StatsStore(StateDeltasStore): src_row = self._simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) + all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} dest_current_row = self._simple_select_one_txn( txn, into_table, - keyvalues={ **keyvalues, **extra_dst_keyvalues }, + keyvalues=all_dest_keyvalues, retcols=list(chain(additive_relatives.keys(), copy_columns)), allow_none=True, ) @@ -968,7 +969,7 @@ class StatsStore(StateDeltasStore): else: for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val - self._simple_update_txn(txn, into_table, keyvalues, src_row) + self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) def incremental_update_room_total_events_and_bytes(self, in_positions): """ -- cgit 1.5.1 From 21593febc349a83f1abca9a696919cc931d17227 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 16:32:20 +0100 Subject: Linting Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 1 - synapse/storage/stats.py | 1 - 2 files changed, 2 deletions(-) (limited to 'synapse/handlers/stats.py') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 572da0a344..67b7c3f513 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -130,7 +130,6 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] - stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4a982f61de..87d5dc5e98 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -661,7 +661,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): """ Get statistics for a given subject. -- cgit 1.5.1 From 02f759e0a213ab8e3d03895bcae3a173ef913bce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Sep 2019 11:33:03 +0100 Subject: Renamve get_room_state --- synapse/handlers/stats.py | 4 ++-- synapse/storage/stats.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/stats.py') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 67b7c3f513..e849c38b85 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -258,7 +258,7 @@ class StatsHandler(StateDeltasHandler): is_newly_created = True elif typ == EventTypes.JoinRules: - old_room_state = yield self.store.get_room_state(room_id) + old_room_state = yield self.store.get_room_stats_state(room_id) yield self.store.update_room_state( room_id, {"join_rules": event_content.get("join_rule")} ) @@ -279,7 +279,7 @@ class StatsHandler(StateDeltasHandler): ) elif typ == EventTypes.RoomHistoryVisibility: - old_room_state = yield self.store.get_room_state(room_id) + old_room_state = yield self.store.get_room_stats_state(room_id) yield self.store.update_room_state( room_id, {"history_visibility": event_content.get("history_visibility")}, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index c3a8d6f5e7..f20d8ba8a4 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -709,9 +709,9 @@ class StatsStore(StateDeltasStore): return slice_list - def get_room_state(self, room_id): + def get_room_stats_state(self, room_id): """ - Returns the current room_state for a room. + Returns the current room_stats_state for a room. Args: room_id (str): The ID of the room to return state for. -- cgit 1.5.1