From 57b58e2174f120fb13fbe2f6d57e8647b69921ec Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 12:17:56 +0200 Subject: make imports local Signed-off-by: Adrian Tschira --- synapse/replication/tcp/resource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 786c3fe864..a41af4fd6c 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -18,8 +18,8 @@ from twisted.internet import defer, reactor from twisted.internet.protocol import Factory -from streams import STREAMS_MAP, FederationStream -from protocol import ServerReplicationStreamProtocol +from .streams import STREAMS_MAP, FederationStream +from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func -- cgit 1.4.1 From 933bf2dd357842b0e7c3fc7a1111d89ab52f5329 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 13:19:12 +0200 Subject: replace some iteritems with six Signed-off-by: Adrian Tschira --- synapse/handlers/device.py | 14 ++++++++------ synapse/handlers/e2e_keys.py | 13 +++++++------ synapse/handlers/groups_local.py | 3 ++- synapse/handlers/presence.py | 15 ++++++++------- synapse/handlers/sync.py | 14 ++++++++------ synapse/handlers/user_directory.py | 3 ++- synapse/metrics/process_collector.py | 3 ++- synapse/push/bulk_push_rule_evaluator.py | 13 +++++++------ synapse/replication/tcp/protocol.py | 9 +++++---- synapse/replication/tcp/resource.py | 3 ++- synapse/rest/media/v1/media_repository.py | 3 ++- synapse/storage/client_ips.py | 6 ++++-- synapse/storage/devices.py | 9 +++++---- synapse/storage/end_to_end_keys.py | 6 ++++-- synapse/storage/event_push_actions.py | 4 +++- 15 files changed, 69 insertions(+), 49 deletions(-) (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f7457a7082..31bd0e60c6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 25aec624af..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,6 +19,7 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, @@ -92,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -103,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -250,9 +251,9 @@ class E2eKeysHandler(object): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 977993e7d4..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -15,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -449,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91218e40e6..b51f925220 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,6 +25,8 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState @@ -40,7 +42,6 @@ import synapse.metrics import logging - logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) @@ -526,7 +527,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -549,14 +550,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -1044,7 +1045,7 @@ class PresenceEventSource(object): defer.returnValue((updates.values(), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1301,11 +1302,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 263e42dded..d0c99c35e3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,6 +28,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -275,7 +277,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -294,7 +296,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -325,7 +327,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -382,7 +384,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -984,7 +986,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1062,7 +1064,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 714f0195c8..cd0b7290f0 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -403,7 +404,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 6fec3de399..50e5b48a2b 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -15,6 +15,7 @@ import os +from six import iteritems TICKS_PER_SEC = 100 BYTES_PER_PAGE = 4096 @@ -55,7 +56,7 @@ def update_resource_metrics(): # line is PID (command) more stats go here ... raw_stats = line.split(") ", 1)[1].split(" ") - for (name, index) in STAT_FIELDS.iteritems(): + for (name, index) in iteritems(STAT_FIELDS): # subtract 3 from the index, because proc(5) is 1-based, and # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..2f7e77f5f5 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -30,6 +30,7 @@ from synapse.state import POWER_KEY from collections import namedtuple +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.itervalues() + (e.type, e.state_key): e for e in itervalues(auth_events) } sender_level = get_user_power_level(event.sender, auth_events) @@ -160,7 +161,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): if event.sender == uid: continue @@ -406,7 +407,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -414,7 +415,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -426,7 +427,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -447,7 +448,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..7ca1588f6a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -68,6 +68,7 @@ import synapse.metrics import struct import fcntl +from six import iterkeys, iteritems metrics = synapse.metrics.get_metrics_for(__name__) @@ -392,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in self.streamer.streams_by_name.iterkeys(): + for stream in iterkeys(self.streamer.streams_by_name): self.subscribe_to_stream(stream, token) else: self.subscribe_to_stream(stream_name, token) @@ -498,7 +499,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): self.replicate(stream_name, token) # Tell the server if we have any users currently syncing (should only @@ -633,7 +634,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.inbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) @@ -643,7 +644,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.outbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..d1c291e17d 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -26,6 +26,7 @@ from synapse.util.metrics import Measure, measure_func import logging import synapse.metrics +from six import itervalues metrics = synapse.metrics.get_metrics_for(__name__) stream_updates_counter = metrics.register_counter( @@ -79,7 +80,7 @@ class ReplicationStreamer(object): # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9800ce7581..2ac767d2dc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -48,6 +48,7 @@ import shutil import cgi import logging from six.moves.urllib import parse as urlparse +from six import iteritems logger = logging.getLogger(__name__) @@ -603,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ba46907737..ce338514e8 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -22,6 +22,8 @@ from . import background_updates from synapse.util.caches import CACHE_SIZE_FACTOR +from six import iteritems + logger = logging.getLogger(__name__) @@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): def _update_client_ips_batch_txn(self, txn, to_update): self.database_engine.lock_table(txn, "user_ips") - for entry in to_update.iteritems(): + for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry self._simple_upsert_txn( @@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in results.iteritems() + for (access_token, ip), (user_agent, last_seen) in iteritems(results) )) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 712106b83a..d149d8392e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, []) if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in query_map.itervalues()) + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( txn, query_map.keys(), include_all_devices=True @@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore): """ results = [] - for user_id, user_devices in devices.iteritems(): + for user_id, user_devices in iteritems(devices): # The prev_id for the first row is always the last row before # `from_stream_id` txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): result = { "device_id": device_id, } diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ff8538ddf8..b146487943 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -21,6 +21,8 @@ import simplejson as json from ._base import SQLBaseStore +from six import iteritems + class EndToEndKeyStore(SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) - for user_id, device_keys in results.iteritems(): - for device_id, device_info in device_keys.iteritems(): + for user_id, device_keys in iteritems(results): + for device_id, device_info in iteritems(device_keys): device_info["keys"] = json.loads(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f084a5f54b..d0350ee5fe 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks import logging import simplejson as json +from six import iteritems + logger = logging.getLogger(__name__) @@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.executemany(sql, ( _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() + for user_id, actions in iteritems(user_id_actions) )) return self.runInteraction( -- cgit 1.4.1 From fcc525b0b705703fead3d703c0df62a264ff8ce8 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 21 May 2018 19:48:57 -0500 Subject: rest of the changes --- synapse/http/request_metrics.py | 106 ++++------------- synapse/http/server.py | 4 +- synapse/push/bulk_push_rule_evaluator.py | 28 ++--- synapse/replication/tcp/resource.py | 30 +++-- synapse/storage/_base.py | 17 ++- synapse/storage/events.py | 28 ++--- tests/metrics/__init__.py | 0 tests/metrics/test_metric.py | 192 ------------------------------- 8 files changed, 68 insertions(+), 337 deletions(-) delete mode 100644 tests/metrics/__init__.py delete mode 100644 tests/metrics/test_metric.py (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 8c850bf23f..34a730d5bc 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -16,86 +16,38 @@ import logging -import synapse.metrics +from prometheus_client.core import Counter, Histogram + from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for("synapse.http.server") # total number of responses served, split by method/servlet/tag -response_count = metrics.register_counter( - "response_count", - labels=["method", "servlet", "tag"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_requests", - "_response_time:count", - "_response_ru_utime:count", - "_response_ru_stime:count", - "_response_db_txn_count:count", - "_response_db_txn_duration:count", - ) - ) -) +response_count = Counter("synapse_http_server_response_count", "", ["method", "servlet", "tag"]) -requests_counter = metrics.register_counter( - "requests_received", - labels=["method", "servlet", ], -) +requests_counter = Counter("synapse_http_server_requests_received", "", ["method", "servlet"]) -outgoing_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_responses_counter = Counter("synapse_http_server_responses", "", ["method", "code"]) -response_timer = metrics.register_counter( - "response_time_seconds", - labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_time:total", - ), -) +response_timer = Histogram("synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"]) -response_ru_utime = metrics.register_counter( - "response_ru_utime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_utime:total", - ), -) +response_ru_utime = Counter("synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"]) -response_ru_stime = metrics.register_counter( - "response_ru_stime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_stime:total", - ), -) +response_ru_stime = Counter("synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"]) -response_db_txn_count = metrics.register_counter( - "response_db_txn_count", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_count:total", - ), -) +response_db_txn_count = Counter("synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -response_db_txn_duration = metrics.register_counter( - "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_duration:total", - ), -) +response_db_txn_duration = Counter("synapse_http_server_response_db_txn_duration_seconds", "", ["method", "servlet", "tag"]) # seconds spent waiting for a db connection, when processing this request -response_db_sched_duration = metrics.register_counter( - "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] +response_db_sched_duration = Counter("synapse_http_request_response_db_sched_duration_seconds", "", ["method", "servlet", "tag"] ) # size in bytes of the response written -response_size = metrics.register_counter( - "response_size", labels=["method", "servlet", "tag"] +response_size = Counter("synapse_http_request_response_size", "", ["method", "servlet", "tag"] ) @@ -119,31 +71,19 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.inc(request.method, str(request.code)) + outgoing_responses_counter.labels(request.method, str(request.code)).inc() - response_count.inc(request.method, self.name, tag) + response_count.labels(request.method, self.name, tag).inc() - response_timer.inc_by( - time_msec - self.start, request.method, - self.name, tag - ) + response_timer.labels(request.method, self.name, tag).observe(time_msec - self.start) ru_utime, ru_stime = context.get_resource_usage() - response_ru_utime.inc_by( - ru_utime, request.method, self.name, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, self.name, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, self.name, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration_ms / 1000., request.method, self.name, tag - ) - response_db_sched_duration.inc_by( - context.db_sched_duration_ms / 1000., request.method, self.name, tag - ) - - response_size.inc_by(request.sentLength, request.method, self.name, tag) + response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) + response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) + response_db_txn_count.labels(request.method, self.name, tag).inc(context.db_txn_count) + response_db_txn_duration.labels(request.method, self.name, tag).inc(context.db_txn_duration_ms / 1000.) + response_db_sched_duration.labels(request.method, self.name, tag).inc( + context.db_sched_duration_ms / 1000.) + + response_size.labels(request.method, self.name, tag).inc(request.sentLength) diff --git a/synapse/http/server.py b/synapse/http/server.py index b6e2ae14a2..f72d986288 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -138,8 +138,8 @@ def wrap_request_handler_with_logging(h): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.inc(request.method, - request.request_metrics.name) + requests_counter.labels(request.method, + request.request_metrics.name).inc() yield d return wrapped_request_handler diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..6fcca5e260 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,35 +22,29 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.event_auth import get_user_power_level from synapse.api.constants import EventTypes, Membership -from synapse.metrics import get_metrics_for -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer from synapse.state import POWER_KEY from collections import namedtuple - +from prometheus_client import Counter logger = logging.getLogger(__name__) rules_by_room = {} -push_metrics = get_metrics_for(__name__) -push_rules_invalidation_counter = push_metrics.register_counter( - "push_rules_invalidation_counter" -) -push_rules_state_size_counter = push_metrics.register_counter( - "push_rules_state_size_counter" -) +push_rules_invalidation_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") # Measures whether we use the fast path of using state deltas, or if we have to # recalculate from scratch -push_rules_delta_state_cache_metric = cache_metrics.register_cache( +push_rules_delta_state_cache_metric = register_cache( "cache", - size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values - cache_name="push_rules_delta_state_cache_metric", + "push_rules_delta_state_cache_metric", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @@ -64,10 +58,10 @@ class BulkPushRuleEvaluator(object): self.store = hs.get_datastore() self.auth = hs.get_auth() - self.room_push_rule_cache_metrics = cache_metrics.register_cache( + self.room_push_rule_cache_metrics = register_cache( "cache", - size_callback=lambda: 0, # There's not good value for this - cache_name="room_push_rule_cache", + "room_push_rule_cache", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @defer.inlineCallbacks @@ -309,7 +303,7 @@ class RulesForRoom(object): current_state_ids = context.current_state_ids push_rules_delta_state_cache_metric.inc_misses() - push_rules_state_size_counter.inc_by(len(current_state_ids)) + push_rules_state_size_counter.inc(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..0e6b1957c6 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -22,20 +22,19 @@ from .streams import STREAMS_MAP, FederationStream from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func +from synapse.metrics import LaterGauge import logging -import synapse.metrics +from prometheus_client import Counter -metrics = synapse.metrics.get_metrics_for(__name__) -stream_updates_counter = metrics.register_counter( - "stream_updates", labels=["stream_name"] +stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] ) -user_sync_counter = metrics.register_counter("user_sync") -federation_ack_counter = metrics.register_counter("federation_ack") -remove_pusher_counter = metrics.register_counter("remove_pusher") -invalidate_cache_counter = metrics.register_counter("invalidate_cache") -user_ip_cache_counter = metrics.register_counter("user_ip_cache") +user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") +federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") +remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") +invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", "") +user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) @@ -73,7 +72,8 @@ class ReplicationStreamer(object): # Current connections. self.connections = [] - metrics.register_callback("total_connections", lambda: len(self.connections)) + l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections)) + l.register() # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been @@ -85,17 +85,15 @@ class ReplicationStreamer(object): self.streams_by_name = {stream.NAME: stream for stream in self.streams} - metrics.register_callback( - "connections_per_stream", + LaterGauge( + "synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"], lambda: { (stream_name,): len([ conn for conn in self.connections if stream_name in conn.replication_streams ]) for stream_name in self.streams_by_name - }, - labels=["stream_name"], - ) + }).register() self.federation_sender = None if not hs.config.send_federation: @@ -175,7 +173,7 @@ class ReplicationStreamer(object): logger.info( "Streaming: %s -> %s", stream.NAME, updates[-1][0] ) - stream_updates_counter.inc_by(len(updates), stream.NAME) + stream_updates_counter.labels(stream.NAME).inc(len(updates)) # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..d1b625dc30 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,8 +18,8 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine -import synapse.metrics +from prometheus_client import Histogram from twisted.internet import defer @@ -34,13 +34,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") +sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "") -metrics = synapse.metrics.get_metrics_for("synapse.storage") - -sql_scheduling_timer = metrics.register_distribution("schedule_time") - -sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) -sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_query_timer = Histogram("synapse_storage_query_time", "", ["verb"]) +sql_txn_timer = Histogram("synapse_storage_transaction_time", "", ["desc"]) class LoggingTransaction(object): @@ -117,7 +114,7 @@ class LoggingTransaction(object): finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) - sql_query_timer.inc_by(msecs, sql.split()[0]) + sql_query_timer.labels(sql.split()[0]).observe(msecs) class PerformanceCounters(object): @@ -287,7 +284,7 @@ class SQLBaseStore(object): self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): @@ -349,7 +346,7 @@ class SQLBaseStore(object): def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection") as context: sched_duration_ms = time.time() * 1000 - start_time - sql_scheduling_timer.inc_by(sched_duration_ms) + sql_scheduling_timer.observe(sched_duration_ms) current_context.add_database_scheduled(sched_duration_ms) if self.database_engine.is_connection_closed(conn): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 05cde96afc..96b48cfdbb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -40,30 +40,24 @@ import synapse.metrics from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -logger = logging.getLogger(__name__) +from prometheus_client import Counter +logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) -persist_event_counter = metrics.register_counter("persisted_events") -event_counter = metrics.register_counter( - "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] -) +persist_event_counter = Counter("synapse_storage_events_persisted_events", "") +event_counter = Counter("synapse_storage_events_persisted_events_sep", "", ["type", "origin_type", "origin_entity"]) # The number of times we are recalculating the current state -state_delta_counter = metrics.register_counter( - "state_delta", -) +state_delta_counter = Counter("synapse_storage_events_state_delta", "") + # The number of times we are recalculating state when there is only a # single forward extremity -state_delta_single_event_counter = metrics.register_counter( - "state_delta_single_event", -) +state_delta_single_event_counter = Counter("synapse_storage_events_state_delta_single_event", "") + # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. -state_delta_reuse_delta_counter = metrics.register_counter( - "state_delta_reuse_delta", -) +state_delta_reuse_delta_counter = Counter("synapse_storage_events_state_delta_reuse_delta", "") def encode_json(json_object): @@ -445,7 +439,7 @@ class EventsStore(EventsWorkerStore): state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) - persist_event_counter.inc_by(len(chunk)) + persist_event_counter.inc(len(chunk)) synapse.metrics.event_persisted_position.set( chunk[-1][0].internal_metadata.stream_ordering, ) @@ -460,7 +454,7 @@ class EventsStore(EventsWorkerStore): origin_type = "remote" origin_entity = get_domain_from_id(event.sender) - event_counter.inc(event.type, origin_type, origin_entity) + event_counter.labels(event.type, origin_type, origin_entity).inc() for room_id, new_state in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( diff --git a/tests/metrics/__init__.py b/tests/metrics/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py deleted file mode 100644 index 069c0be762..0000000000 --- a/tests/metrics/test_metric.py +++ /dev/null @@ -1,192 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from tests import unittest - -from synapse.metrics.metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - _escape_label_value, -) - - -class CounterMetricTestCase(unittest.TestCase): - - def test_scalar(self): - counter = CounterMetric("scalar") - - self.assertEquals(counter.render(), [ - 'scalar 0', - ]) - - counter.inc() - - self.assertEquals(counter.render(), [ - 'scalar 1', - ]) - - counter.inc_by(2) - - self.assertEquals(counter.render(), [ - 'scalar 3' - ]) - - def test_vector(self): - counter = CounterMetric("vector", labels=["method"]) - - # Empty counter doesn't yet know what values it has - self.assertEquals(counter.render(), []) - - counter.inc("GET") - - self.assertEquals(counter.render(), [ - 'vector{method="GET"} 1', - ]) - - counter.inc("GET") - counter.inc("PUT") - - self.assertEquals(counter.render(), [ - 'vector{method="GET"} 2', - 'vector{method="PUT"} 1', - ]) - - -class CallbackMetricTestCase(unittest.TestCase): - - def test_scalar(self): - d = dict() - - metric = CallbackMetric("size", lambda: len(d)) - - self.assertEquals(metric.render(), [ - 'size 0', - ]) - - d["key"] = "value" - - self.assertEquals(metric.render(), [ - 'size 1', - ]) - - def test_vector(self): - vals = dict() - - metric = CallbackMetric("values", lambda: vals, labels=["type"]) - - self.assertEquals(metric.render(), []) - - # Keys have to be tuples, even if they're 1-element - vals[("foo",)] = 1 - vals[("bar",)] = 2 - - self.assertEquals(metric.render(), [ - 'values{type="bar"} 2', - 'values{type="foo"} 1', - ]) - - -class DistributionMetricTestCase(unittest.TestCase): - - def test_scalar(self): - metric = DistributionMetric("thing") - - self.assertEquals(metric.render(), [ - 'thing:count 0', - 'thing:total 0', - ]) - - metric.inc_by(500) - - self.assertEquals(metric.render(), [ - 'thing:count 1', - 'thing:total 500', - ]) - - def test_vector(self): - metric = DistributionMetric("queries", labels=["verb"]) - - self.assertEquals(metric.render(), []) - - metric.inc_by(300, "SELECT") - metric.inc_by(200, "SELECT") - metric.inc_by(800, "INSERT") - - self.assertEquals(metric.render(), [ - 'queries:count{verb="INSERT"} 1', - 'queries:count{verb="SELECT"} 2', - 'queries:total{verb="INSERT"} 800', - 'queries:total{verb="SELECT"} 500', - ]) - - -class CacheMetricTestCase(unittest.TestCase): - - def test_cache(self): - d = dict() - - metric = CacheMetric("cache", lambda: len(d), "cache_name") - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 0', - 'cache:total{name="cache_name"} 0', - 'cache:size{name="cache_name"} 0', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_misses() - d["key"] = "value" - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 0', - 'cache:total{name="cache_name"} 1', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_hits() - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 1', - 'cache:total{name="cache_name"} 2', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_evictions(2) - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 1', - 'cache:total{name="cache_name"} 2', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 2', - ]) - - -class LabelValueEscapeTestCase(unittest.TestCase): - def test_simple(self): - string = "safjhsdlifhyskljfksdfh" - self.assertEqual(string, _escape_label_value(string)) - - def test_escape(self): - self.assertEqual( - "abc\\\"def\\nghi\\\\", - _escape_label_value("abc\"def\nghi\\"), - ) - - def test_sequence_of_escapes(self): - self.assertEqual( - "abc\\\"def\\nghi\\\\\\n", - _escape_label_value("abc\"def\nghi\\\n"), - ) -- cgit 1.4.1 From 9ea219c5143de009c8e411fbb01da1da5ce50829 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 17 May 2018 17:35:31 +0100 Subject: Send users a server notice about consent When a user first syncs, we will send them a server notice asking them to consent to the privacy policy if they have not already done so. --- synapse/config/consent_config.py | 8 ++ synapse/handlers/presence.py | 10 +- synapse/replication/tcp/resource.py | 2 + synapse/server.py | 5 + synapse/server.pyi | 4 + synapse/server_notices/consent_server_notices.py | 101 +++++++++++++++++++++ synapse/server_notices/server_notices_sender.py | 58 ++++++++++++ synapse/storage/registration.py | 46 ++++++++-- .../49/add_user_consent_server_notice_sent.sql | 20 ++++ tests/storage/test_registration.py | 11 ++- tests/utils.py | 1 + 11 files changed, 255 insertions(+), 11 deletions(-) create mode 100644 synapse/server_notices/consent_server_notices.py create mode 100644 synapse/server_notices/server_notices_sender.py create mode 100644 synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 45856b9e8a..a6fbc5a058 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -30,9 +30,17 @@ DEFAULT_CONFIG = """\ # the version to be served by the consent resource if there is no 'v' # parameter. # +# 'server_notice_content', if enabled, will send a user a "Server Notice" +# asking them to consent to the privacy policy. The 'server_notices' section +# must also be configured for this to work. +# # user_consent: # template_dir: res/templates/privacy # version: 1.0 +# server_notice_content: +# msgtype: m.text +# body: | +# Pls do consent kthx """ diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91218e40e6..adc816f747 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -87,6 +87,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id self.clock = hs.get_clock() @@ -94,8 +99,8 @@ class PresenceHandler(object): self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() + self._server_notices_sender = hs.get_server_notices_sender() federation_registry = hs.get_federation_registry() @@ -428,6 +433,9 @@ class PresenceHandler(object): last_user_sync_ts=self.clock.time_msec(), )]) + # send any outstanding server notices to the user. + yield self._server_notices_sender.on_user_syncing(user_id) + @defer.inlineCallbacks def _end(): try: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..a603c520ea 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -69,6 +69,7 @@ class ReplicationStreamer(object): self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self._server_notices_sender = hs.get_server_notices_sender() # Current connections. self.connections = [] @@ -253,6 +254,7 @@ class ReplicationStreamer(object): yield self.store.insert_client_ip( user_id, access_token, ip, user_agent, device_id, last_seen, ) + yield self._server_notices_sender.on_user_ip(user_id) def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. diff --git a/synapse/server.py b/synapse/server.py index 85f54cd047..e7c733f2d4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -73,6 +73,7 @@ from synapse.rest.media.v1.media_repository import ( MediaRepositoryResource, ) from synapse.server_notices.server_notices_manager import ServerNoticesManager +from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import DataStore from synapse.streams.events import EventSources @@ -158,6 +159,7 @@ class HomeServer(object): 'room_member_handler', 'federation_registry', 'server_notices_manager', + 'server_notices_sender', ] def __init__(self, hostname, **kwargs): @@ -403,6 +405,9 @@ class HomeServer(object): def build_server_notices_manager(self): return ServerNoticesManager(self) + def build_server_notices_sender(self): + return ServerNoticesSender(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/server.pyi b/synapse/server.pyi index 6fbe15168d..ce28486233 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -10,6 +10,7 @@ import synapse.handlers.e2e_keys import synapse.handlers.set_password import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager +import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage @@ -69,3 +70,6 @@ class HomeServer(object): def get_server_notices_manager(self) -> synapse.server_notices.server_notices_manager.ServerNoticesManager: pass + + def get_server_notices_sender(self) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: + pass diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py new file mode 100644 index 0000000000..e9098aef27 --- /dev/null +++ b/synapse/server_notices/consent_server_notices.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.config import ConfigError + +logger = logging.getLogger(__name__) + + +class ConsentServerNotices(object): + """Keeps track of whether we need to send users server_notices about + privacy policy consent, and sends one if we do. + """ + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ + self._server_notices_manager = hs.get_server_notices_manager() + self._store = hs.get_datastore() + + self._current_consent_version = None + self._server_notice_content = None + self._users_in_progress = set() + + consent_config = hs.config.consent_config + if consent_config is not None: + self._current_consent_version = str(consent_config["version"]) + self._server_notice_content = consent_config.get( + "server_notice_content" + ) + + if self._server_notice_content is not None: + if not self._server_notices_manager.is_enabled(): + raise ConfigError( + "user_consent configuration requires server notices, but " + "server notices are not enabled.", + ) + if 'body' not in self._server_notice_content: + raise ConfigError( + "user_consent server_notice_consent must contain a 'body' " + "key.", + ) + + @defer.inlineCallbacks + def maybe_send_server_notice_to_user(self, user_id): + """Check if we need to send a notice to this user, and does so if so + + Args: + user_id (str): user to check + + Returns: + Deferred + """ + if self._server_notice_content is None: + # not enabled + return + + # make sure we don't send two messages to the same user at once + if user_id in self._users_in_progress: + return + self._users_in_progress.add(user_id) + try: + u = yield self._store.get_user_by_id(user_id) + + if u["consent_version"] == self._current_consent_version: + # user has already consented + return + + if u["consent_server_notice_sent"] == self._current_consent_version: + # we've already sent a notice to the user + return + + # need to send a message + try: + yield self._server_notices_manager.send_notice( + user_id, self._server_notice_content, + ) + yield self._store.user_set_consent_server_notice_sent( + user_id, self._current_consent_version, + ) + except SynapseError as e: + logger.error("Error sending server notice about user consent: %s", e) + finally: + self._users_in_progress.remove(user_id) diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py new file mode 100644 index 0000000000..9eade85851 --- /dev/null +++ b/synapse/server_notices/server_notices_sender.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.server_notices.consent_server_notices import ConsentServerNotices + + +class ServerNoticesSender(object): + """A centralised place which sends server notices automatically when + Certain Events take place + """ + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ + # todo: it would be nice to make this more dynamic + self._consent_server_notices = ConsentServerNotices(hs) + + def on_user_syncing(self, user_id): + """Called when the user performs a sync operation. + + This is only called when /sync (or /events) is called on the synapse + master. In a deployment with synchrotrons, on_user_ip is called + + Args: + user_id (str): mxid of user who synced + + Returns: + Deferred + """ + return self._consent_server_notices.maybe_send_server_notice_to_user( + user_id, + ) + + def on_user_ip(self, user_id): + """Called when a worker process saw a client request. + + Args: + user_id (str): mxid + + Returns: + Deferred + """ + return self._consent_server_notices.maybe_send_server_notice_to_user( + user_id, + ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 8d1a01f1ee..a530e29f43 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -33,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash", "is_guest"], + retcols=[ + "name", "password_hash", "is_guest", + "consent_version", "consent_server_notice_sent", + ], allow_none=True, desc="get_user_by_id", ) @@ -297,12 +300,41 @@ class RegistrationStore(RegistrationWorkerStore, Raises: StoreError(404) if user not found """ - return self._simple_update_one( - table='users', - keyvalues={'name': user_id, }, - updatevalues={'consent_version': consent_version, }, - desc="user_set_consent_version" - ) + def f(txn): + self._simple_update_one_txn( + txn, + table='users', + keyvalues={'name': user_id, }, + updatevalues={'consent_version': consent_version, }, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction("user_set_consent_version", f) + + def user_set_consent_server_notice_sent(self, user_id, consent_version): + """Updates the user table to record that we have sent the user a server + notice about privacy policy consent + + Args: + user_id (str): full mxid of the user to update + consent_version (str): version of the policy we have notified the + user about + + Raises: + StoreError(404) if user not found + """ + def f(txn): + self._simple_update_one_txn( + txn, + table='users', + keyvalues={'name': user_id, }, + updatevalues={'consent_server_notice_sent': consent_version, }, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction("user_set_consent_server_notice_sent", f) def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None): diff --git a/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql new file mode 100644 index 0000000000..14dcf18d73 --- /dev/null +++ b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql @@ -0,0 +1,20 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* record whether we have sent a server notice about consenting to the + * privacy policy. Specifically records the version of the policy we sent + * a message about. + */ +ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT; diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 7c7b164ee6..cc637dda1c 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -42,9 +42,14 @@ class RegistrationStoreTestCase(unittest.TestCase): yield self.store.register(self.user_id, self.tokens[0], self.pwhash) self.assertEquals( - # TODO(paul): Surely this field should be 'user_id', not 'name' - # Additionally surely it shouldn't come in a 1-element list - {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0}, + { + # TODO(paul): Surely this field should be 'user_id', not 'name' + "name": self.user_id, + "password_hash": self.pwhash, + "is_guest": 0, + "consent_version": None, + "consent_server_notice_sent": None, + }, (yield self.store.get_user_by_id(self.user_id)) ) diff --git a/tests/utils.py b/tests/utils.py index c2beb5d9f7..63d8e9c640 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -63,6 +63,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.federation_rc_concurrent = 10 config.filter_timeline_limit = 5000 config.user_directory_search_all_users = False + config.consent_config = None # disable user directory updates, because they get done in the # background, which upsets the test runner. -- cgit 1.4.1 From 8f5a688d420c8f6b51826c561da9094b76fbea1e Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 10:56:03 -0500 Subject: cleanups, self-registration --- synapse/handlers/presence.py | 7 ++++--- synapse/http/request_metrics.py | 15 +++------------ synapse/replication/tcp/resource.py | 9 +++++---- 3 files changed, 12 insertions(+), 19 deletions(-) (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aca822c46a..4ee87d5714 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -142,8 +142,9 @@ class PresenceHandler(object): } LaterGauge( - "user_to_current_state_size", "", [], lambda: len(self.user_to_current_state) - ).register() + "synapse_handlers_presence_user_to_current_state_size", "", [], + lambda: len(self.user_to_current_state) + ) now = self.clock.time_msec() for state in active_presence: @@ -212,7 +213,7 @@ class PresenceHandler(object): 60 * 1000, ) - LaterGauge("wheel_timer_size", "", [], lambda: len(self.wheel_timer)).register() + LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], lambda: len(self.wheel_timer)) @defer.inlineCallbacks def _on_shutdown(self): diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 0984870e7e..e7f1bfc4ae 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -71,24 +71,14 @@ in_flight_requests_db_sched_duration = Counter("synapse_http_request_in_flight_r _in_flight_requests = set() -def _collect_in_flight(): - """Called just before metrics are collected, so we use it to update all - the in flight request metrics - """ - - for rm in _in_flight_requests: - rm.update_metrics() - - -metrics.register_collector(_collect_in_flight) - - def _get_in_flight_counts(): """Returns a count of all in flight requests by (method, server_name) Returns: dict[tuple[str, str], int] """ + for rm in _in_flight_requests: + rm.update_metrics() # Map from (method, name) -> int, the number of in flight requests of that # type @@ -99,6 +89,7 @@ def _get_in_flight_counts(): return counts + LaterGauge( "synapse_http_request_metrics_in_flight_requests_count", "", ["method", "servlet"], diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index ed251c79ac..1ca30bc31a 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -73,8 +73,8 @@ class ReplicationStreamer(object): # Current connections. self.connections = [] - l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections)) - l.register() + LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], + lambda: len(self.connections)) # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been @@ -87,14 +87,15 @@ class ReplicationStreamer(object): self.streams_by_name = {stream.NAME: stream for stream in self.streams} LaterGauge( - "synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"], + "synapse_replication_tcp_resource_connections_per_stream", "", + ["stream_name"], lambda: { (stream_name,): len([ conn for conn in self.connections if stream_name in conn.replication_streams ]) for stream_name in self.streams_by_name - }).register() + }) self.federation_sender = None if not hs.config.send_federation: -- cgit 1.4.1 From b6063631c3c5ea4c4d118cc9f9d7e2896949ee1d Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 17:36:20 -0500 Subject: more cleanup --- synapse/metrics/__init__.py | 4 +--- synapse/replication/tcp/protocol.py | 9 ++++++--- synapse/replication/tcp/resource.py | 7 ++++--- 3 files changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse/replication/tcp/resource.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bed37b5f56..75117915ff 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -21,7 +21,7 @@ import platform import attr from prometheus_client import Gauge, Histogram, Counter -from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY +from prometheus_client.core import GaugeMetricFamily, REGISTRY from twisted.internet import reactor @@ -97,8 +97,6 @@ gc_time = Histogram( class GCCounts(object): def collect(self): - gc_counts = gc.get_count() - cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index bdb110663c..5995e5d553 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -197,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter[cmd_name] = self.inbound_commands_counter[cmd_name] + 1 + self.inbound_commands_counter[cmd_name] = ( + self.inbound_commands_counter[cmd_name] + 1) cmd_cls = COMMAND_MAP[cmd_name] try: @@ -247,7 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - self.outbound_commands_counter[cmd.NAME] = self.outbound_commands_counter[cmd.NAME] + 1 + self.outbound_commands_counter[cmd.NAME] = ( + self.outbound_commands_counter[cmd.NAME] + 1) string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -632,4 +634,5 @@ tcp_outbound_commands = LaterGauge( }) # number of updates received for each RDATA stream -inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", ["stream_name"]) +inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", + ["stream_name"]) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 1ca30bc31a..19987c06f0 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -28,12 +28,13 @@ import logging from prometheus_client import Counter -stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] -) +stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", + "", ["stream_name"]) user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") -invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", "") +invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", + "") user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) -- cgit 1.4.1