From dcc235b47d694d9ef7181b379d4e38bed2677028 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Mon, 30 Apr 2018 16:38:23 +0200 Subject: use stand-in value if maxint is not available Signed-off-by: Adrian Tschira --- synapse/storage/_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..962cc270bc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -30,6 +30,12 @@ import threading logger = logging.getLogger(__name__) +try: + MAX_TXN_ID = sys.maxint - 1 +except AttributeError: + # python 3 does not have a maximum int value + MAX_TXN_ID = 2**63 - 1 + sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") @@ -222,7 +228,7 @@ class SQLBaseStore(object): # We don't really need these to be unique, so lets stop it from # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID) name = "%s-%x" % (desc, txn_id, ) -- cgit 1.5.1 From 933bf2dd357842b0e7c3fc7a1111d89ab52f5329 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 13:19:12 +0200 Subject: replace some iteritems with six Signed-off-by: Adrian Tschira --- synapse/handlers/device.py | 14 ++++++++------ synapse/handlers/e2e_keys.py | 13 +++++++------ synapse/handlers/groups_local.py | 3 ++- synapse/handlers/presence.py | 15 ++++++++------- synapse/handlers/sync.py | 14 ++++++++------ synapse/handlers/user_directory.py | 3 ++- synapse/metrics/process_collector.py | 3 ++- synapse/push/bulk_push_rule_evaluator.py | 13 +++++++------ synapse/replication/tcp/protocol.py | 9 +++++---- synapse/replication/tcp/resource.py | 3 ++- synapse/rest/media/v1/media_repository.py | 3 ++- synapse/storage/client_ips.py | 6 ++++-- synapse/storage/devices.py | 9 +++++---- synapse/storage/end_to_end_keys.py | 6 ++++-- synapse/storage/event_push_actions.py | 4 +++- 15 files changed, 69 insertions(+), 49 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f7457a7082..31bd0e60c6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 25aec624af..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,6 +19,7 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, @@ -92,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -103,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -250,9 +251,9 @@ class E2eKeysHandler(object): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 977993e7d4..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -15,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -449,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91218e40e6..b51f925220 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,6 +25,8 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState @@ -40,7 +42,6 @@ import synapse.metrics import logging - logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) @@ -526,7 +527,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -549,14 +550,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -1044,7 +1045,7 @@ class PresenceEventSource(object): defer.returnValue((updates.values(), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1301,11 +1302,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 263e42dded..d0c99c35e3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,6 +28,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -275,7 +277,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -294,7 +296,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -325,7 +327,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -382,7 +384,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -984,7 +986,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1062,7 +1064,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 714f0195c8..cd0b7290f0 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -403,7 +404,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 6fec3de399..50e5b48a2b 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -15,6 +15,7 @@ import os +from six import iteritems TICKS_PER_SEC = 100 BYTES_PER_PAGE = 4096 @@ -55,7 +56,7 @@ def update_resource_metrics(): # line is PID (command) more stats go here ... raw_stats = line.split(") ", 1)[1].split(" ") - for (name, index) in STAT_FIELDS.iteritems(): + for (name, index) in iteritems(STAT_FIELDS): # subtract 3 from the index, because proc(5) is 1-based, and # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..2f7e77f5f5 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -30,6 +30,7 @@ from synapse.state import POWER_KEY from collections import namedtuple +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.itervalues() + (e.type, e.state_key): e for e in itervalues(auth_events) } sender_level = get_user_power_level(event.sender, auth_events) @@ -160,7 +161,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): if event.sender == uid: continue @@ -406,7 +407,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -414,7 +415,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -426,7 +427,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -447,7 +448,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..7ca1588f6a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -68,6 +68,7 @@ import synapse.metrics import struct import fcntl +from six import iterkeys, iteritems metrics = synapse.metrics.get_metrics_for(__name__) @@ -392,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in self.streamer.streams_by_name.iterkeys(): + for stream in iterkeys(self.streamer.streams_by_name): self.subscribe_to_stream(stream, token) else: self.subscribe_to_stream(stream_name, token) @@ -498,7 +499,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): self.replicate(stream_name, token) # Tell the server if we have any users currently syncing (should only @@ -633,7 +634,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.inbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) @@ -643,7 +644,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.outbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..d1c291e17d 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -26,6 +26,7 @@ from synapse.util.metrics import Measure, measure_func import logging import synapse.metrics +from six import itervalues metrics = synapse.metrics.get_metrics_for(__name__) stream_updates_counter = metrics.register_counter( @@ -79,7 +80,7 @@ class ReplicationStreamer(object): # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9800ce7581..2ac767d2dc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -48,6 +48,7 @@ import shutil import cgi import logging from six.moves.urllib import parse as urlparse +from six import iteritems logger = logging.getLogger(__name__) @@ -603,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ba46907737..ce338514e8 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -22,6 +22,8 @@ from . import background_updates from synapse.util.caches import CACHE_SIZE_FACTOR +from six import iteritems + logger = logging.getLogger(__name__) @@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): def _update_client_ips_batch_txn(self, txn, to_update): self.database_engine.lock_table(txn, "user_ips") - for entry in to_update.iteritems(): + for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry self._simple_upsert_txn( @@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in results.iteritems() + for (access_token, ip), (user_agent, last_seen) in iteritems(results) )) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 712106b83a..d149d8392e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, []) if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in query_map.itervalues()) + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( txn, query_map.keys(), include_all_devices=True @@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore): """ results = [] - for user_id, user_devices in devices.iteritems(): + for user_id, user_devices in iteritems(devices): # The prev_id for the first row is always the last row before # `from_stream_id` txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): result = { "device_id": device_id, } diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ff8538ddf8..b146487943 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -21,6 +21,8 @@ import simplejson as json from ._base import SQLBaseStore +from six import iteritems + class EndToEndKeyStore(SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) - for user_id, device_keys in results.iteritems(): - for device_id, device_info in device_keys.iteritems(): + for user_id, device_keys in iteritems(results): + for device_id, device_info in iteritems(device_keys): device_info["keys"] = json.loads(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f084a5f54b..d0350ee5fe 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks import logging import simplejson as json +from six import iteritems + logger = logging.getLogger(__name__) @@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.executemany(sql, ( _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() + for user_id, actions in iteritems(user_id_actions) )) return self.runInteraction( -- cgit 1.5.1 From fcc525b0b705703fead3d703c0df62a264ff8ce8 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 21 May 2018 19:48:57 -0500 Subject: rest of the changes --- synapse/http/request_metrics.py | 106 ++++------------- synapse/http/server.py | 4 +- synapse/push/bulk_push_rule_evaluator.py | 28 ++--- synapse/replication/tcp/resource.py | 30 +++-- synapse/storage/_base.py | 17 ++- synapse/storage/events.py | 28 ++--- tests/metrics/__init__.py | 0 tests/metrics/test_metric.py | 192 ------------------------------- 8 files changed, 68 insertions(+), 337 deletions(-) delete mode 100644 tests/metrics/__init__.py delete mode 100644 tests/metrics/test_metric.py (limited to 'synapse/storage') diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 8c850bf23f..34a730d5bc 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -16,86 +16,38 @@ import logging -import synapse.metrics +from prometheus_client.core import Counter, Histogram + from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for("synapse.http.server") # total number of responses served, split by method/servlet/tag -response_count = metrics.register_counter( - "response_count", - labels=["method", "servlet", "tag"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_requests", - "_response_time:count", - "_response_ru_utime:count", - "_response_ru_stime:count", - "_response_db_txn_count:count", - "_response_db_txn_duration:count", - ) - ) -) +response_count = Counter("synapse_http_server_response_count", "", ["method", "servlet", "tag"]) -requests_counter = metrics.register_counter( - "requests_received", - labels=["method", "servlet", ], -) +requests_counter = Counter("synapse_http_server_requests_received", "", ["method", "servlet"]) -outgoing_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_responses_counter = Counter("synapse_http_server_responses", "", ["method", "code"]) -response_timer = metrics.register_counter( - "response_time_seconds", - labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_time:total", - ), -) +response_timer = Histogram("synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"]) -response_ru_utime = metrics.register_counter( - "response_ru_utime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_utime:total", - ), -) +response_ru_utime = Counter("synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"]) -response_ru_stime = metrics.register_counter( - "response_ru_stime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_stime:total", - ), -) +response_ru_stime = Counter("synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"]) -response_db_txn_count = metrics.register_counter( - "response_db_txn_count", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_count:total", - ), -) +response_db_txn_count = Counter("synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -response_db_txn_duration = metrics.register_counter( - "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_duration:total", - ), -) +response_db_txn_duration = Counter("synapse_http_server_response_db_txn_duration_seconds", "", ["method", "servlet", "tag"]) # seconds spent waiting for a db connection, when processing this request -response_db_sched_duration = metrics.register_counter( - "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] +response_db_sched_duration = Counter("synapse_http_request_response_db_sched_duration_seconds", "", ["method", "servlet", "tag"] ) # size in bytes of the response written -response_size = metrics.register_counter( - "response_size", labels=["method", "servlet", "tag"] +response_size = Counter("synapse_http_request_response_size", "", ["method", "servlet", "tag"] ) @@ -119,31 +71,19 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.inc(request.method, str(request.code)) + outgoing_responses_counter.labels(request.method, str(request.code)).inc() - response_count.inc(request.method, self.name, tag) + response_count.labels(request.method, self.name, tag).inc() - response_timer.inc_by( - time_msec - self.start, request.method, - self.name, tag - ) + response_timer.labels(request.method, self.name, tag).observe(time_msec - self.start) ru_utime, ru_stime = context.get_resource_usage() - response_ru_utime.inc_by( - ru_utime, request.method, self.name, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, self.name, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, self.name, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration_ms / 1000., request.method, self.name, tag - ) - response_db_sched_duration.inc_by( - context.db_sched_duration_ms / 1000., request.method, self.name, tag - ) - - response_size.inc_by(request.sentLength, request.method, self.name, tag) + response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) + response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) + response_db_txn_count.labels(request.method, self.name, tag).inc(context.db_txn_count) + response_db_txn_duration.labels(request.method, self.name, tag).inc(context.db_txn_duration_ms / 1000.) + response_db_sched_duration.labels(request.method, self.name, tag).inc( + context.db_sched_duration_ms / 1000.) + + response_size.labels(request.method, self.name, tag).inc(request.sentLength) diff --git a/synapse/http/server.py b/synapse/http/server.py index b6e2ae14a2..f72d986288 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -138,8 +138,8 @@ def wrap_request_handler_with_logging(h): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.inc(request.method, - request.request_metrics.name) + requests_counter.labels(request.method, + request.request_metrics.name).inc() yield d return wrapped_request_handler diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..6fcca5e260 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,35 +22,29 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.event_auth import get_user_power_level from synapse.api.constants import EventTypes, Membership -from synapse.metrics import get_metrics_for -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer from synapse.state import POWER_KEY from collections import namedtuple - +from prometheus_client import Counter logger = logging.getLogger(__name__) rules_by_room = {} -push_metrics = get_metrics_for(__name__) -push_rules_invalidation_counter = push_metrics.register_counter( - "push_rules_invalidation_counter" -) -push_rules_state_size_counter = push_metrics.register_counter( - "push_rules_state_size_counter" -) +push_rules_invalidation_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") # Measures whether we use the fast path of using state deltas, or if we have to # recalculate from scratch -push_rules_delta_state_cache_metric = cache_metrics.register_cache( +push_rules_delta_state_cache_metric = register_cache( "cache", - size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values - cache_name="push_rules_delta_state_cache_metric", + "push_rules_delta_state_cache_metric", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @@ -64,10 +58,10 @@ class BulkPushRuleEvaluator(object): self.store = hs.get_datastore() self.auth = hs.get_auth() - self.room_push_rule_cache_metrics = cache_metrics.register_cache( + self.room_push_rule_cache_metrics = register_cache( "cache", - size_callback=lambda: 0, # There's not good value for this - cache_name="room_push_rule_cache", + "room_push_rule_cache", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @defer.inlineCallbacks @@ -309,7 +303,7 @@ class RulesForRoom(object): current_state_ids = context.current_state_ids push_rules_delta_state_cache_metric.inc_misses() - push_rules_state_size_counter.inc_by(len(current_state_ids)) + push_rules_state_size_counter.inc(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..0e6b1957c6 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -22,20 +22,19 @@ from .streams import STREAMS_MAP, FederationStream from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func +from synapse.metrics import LaterGauge import logging -import synapse.metrics +from prometheus_client import Counter -metrics = synapse.metrics.get_metrics_for(__name__) -stream_updates_counter = metrics.register_counter( - "stream_updates", labels=["stream_name"] +stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] ) -user_sync_counter = metrics.register_counter("user_sync") -federation_ack_counter = metrics.register_counter("federation_ack") -remove_pusher_counter = metrics.register_counter("remove_pusher") -invalidate_cache_counter = metrics.register_counter("invalidate_cache") -user_ip_cache_counter = metrics.register_counter("user_ip_cache") +user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") +federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") +remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") +invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", "") +user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) @@ -73,7 +72,8 @@ class ReplicationStreamer(object): # Current connections. self.connections = [] - metrics.register_callback("total_connections", lambda: len(self.connections)) + l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections)) + l.register() # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been @@ -85,17 +85,15 @@ class ReplicationStreamer(object): self.streams_by_name = {stream.NAME: stream for stream in self.streams} - metrics.register_callback( - "connections_per_stream", + LaterGauge( + "synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"], lambda: { (stream_name,): len([ conn for conn in self.connections if stream_name in conn.replication_streams ]) for stream_name in self.streams_by_name - }, - labels=["stream_name"], - ) + }).register() self.federation_sender = None if not hs.config.send_federation: @@ -175,7 +173,7 @@ class ReplicationStreamer(object): logger.info( "Streaming: %s -> %s", stream.NAME, updates[-1][0] ) - stream_updates_counter.inc_by(len(updates), stream.NAME) + stream_updates_counter.labels(stream.NAME).inc(len(updates)) # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..d1b625dc30 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,8 +18,8 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine -import synapse.metrics +from prometheus_client import Histogram from twisted.internet import defer @@ -34,13 +34,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") +sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "") -metrics = synapse.metrics.get_metrics_for("synapse.storage") - -sql_scheduling_timer = metrics.register_distribution("schedule_time") - -sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) -sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_query_timer = Histogram("synapse_storage_query_time", "", ["verb"]) +sql_txn_timer = Histogram("synapse_storage_transaction_time", "", ["desc"]) class LoggingTransaction(object): @@ -117,7 +114,7 @@ class LoggingTransaction(object): finally: msecs = (time.time() * 1000) - start sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) - sql_query_timer.inc_by(msecs, sql.split()[0]) + sql_query_timer.labels(sql.split()[0]).observe(msecs) class PerformanceCounters(object): @@ -287,7 +284,7 @@ class SQLBaseStore(object): self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): @@ -349,7 +346,7 @@ class SQLBaseStore(object): def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection") as context: sched_duration_ms = time.time() * 1000 - start_time - sql_scheduling_timer.inc_by(sched_duration_ms) + sql_scheduling_timer.observe(sched_duration_ms) current_context.add_database_scheduled(sched_duration_ms) if self.database_engine.is_connection_closed(conn): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 05cde96afc..96b48cfdbb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -40,30 +40,24 @@ import synapse.metrics from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -logger = logging.getLogger(__name__) +from prometheus_client import Counter +logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) -persist_event_counter = metrics.register_counter("persisted_events") -event_counter = metrics.register_counter( - "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] -) +persist_event_counter = Counter("synapse_storage_events_persisted_events", "") +event_counter = Counter("synapse_storage_events_persisted_events_sep", "", ["type", "origin_type", "origin_entity"]) # The number of times we are recalculating the current state -state_delta_counter = metrics.register_counter( - "state_delta", -) +state_delta_counter = Counter("synapse_storage_events_state_delta", "") + # The number of times we are recalculating state when there is only a # single forward extremity -state_delta_single_event_counter = metrics.register_counter( - "state_delta_single_event", -) +state_delta_single_event_counter = Counter("synapse_storage_events_state_delta_single_event", "") + # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. -state_delta_reuse_delta_counter = metrics.register_counter( - "state_delta_reuse_delta", -) +state_delta_reuse_delta_counter = Counter("synapse_storage_events_state_delta_reuse_delta", "") def encode_json(json_object): @@ -445,7 +439,7 @@ class EventsStore(EventsWorkerStore): state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) - persist_event_counter.inc_by(len(chunk)) + persist_event_counter.inc(len(chunk)) synapse.metrics.event_persisted_position.set( chunk[-1][0].internal_metadata.stream_ordering, ) @@ -460,7 +454,7 @@ class EventsStore(EventsWorkerStore): origin_type = "remote" origin_entity = get_domain_from_id(event.sender) - event_counter.inc(event.type, origin_type, origin_entity) + event_counter.labels(event.type, origin_type, origin_entity).inc() for room_id, new_state in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( diff --git a/tests/metrics/__init__.py b/tests/metrics/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py deleted file mode 100644 index 069c0be762..0000000000 --- a/tests/metrics/test_metric.py +++ /dev/null @@ -1,192 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from tests import unittest - -from synapse.metrics.metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - _escape_label_value, -) - - -class CounterMetricTestCase(unittest.TestCase): - - def test_scalar(self): - counter = CounterMetric("scalar") - - self.assertEquals(counter.render(), [ - 'scalar 0', - ]) - - counter.inc() - - self.assertEquals(counter.render(), [ - 'scalar 1', - ]) - - counter.inc_by(2) - - self.assertEquals(counter.render(), [ - 'scalar 3' - ]) - - def test_vector(self): - counter = CounterMetric("vector", labels=["method"]) - - # Empty counter doesn't yet know what values it has - self.assertEquals(counter.render(), []) - - counter.inc("GET") - - self.assertEquals(counter.render(), [ - 'vector{method="GET"} 1', - ]) - - counter.inc("GET") - counter.inc("PUT") - - self.assertEquals(counter.render(), [ - 'vector{method="GET"} 2', - 'vector{method="PUT"} 1', - ]) - - -class CallbackMetricTestCase(unittest.TestCase): - - def test_scalar(self): - d = dict() - - metric = CallbackMetric("size", lambda: len(d)) - - self.assertEquals(metric.render(), [ - 'size 0', - ]) - - d["key"] = "value" - - self.assertEquals(metric.render(), [ - 'size 1', - ]) - - def test_vector(self): - vals = dict() - - metric = CallbackMetric("values", lambda: vals, labels=["type"]) - - self.assertEquals(metric.render(), []) - - # Keys have to be tuples, even if they're 1-element - vals[("foo",)] = 1 - vals[("bar",)] = 2 - - self.assertEquals(metric.render(), [ - 'values{type="bar"} 2', - 'values{type="foo"} 1', - ]) - - -class DistributionMetricTestCase(unittest.TestCase): - - def test_scalar(self): - metric = DistributionMetric("thing") - - self.assertEquals(metric.render(), [ - 'thing:count 0', - 'thing:total 0', - ]) - - metric.inc_by(500) - - self.assertEquals(metric.render(), [ - 'thing:count 1', - 'thing:total 500', - ]) - - def test_vector(self): - metric = DistributionMetric("queries", labels=["verb"]) - - self.assertEquals(metric.render(), []) - - metric.inc_by(300, "SELECT") - metric.inc_by(200, "SELECT") - metric.inc_by(800, "INSERT") - - self.assertEquals(metric.render(), [ - 'queries:count{verb="INSERT"} 1', - 'queries:count{verb="SELECT"} 2', - 'queries:total{verb="INSERT"} 800', - 'queries:total{verb="SELECT"} 500', - ]) - - -class CacheMetricTestCase(unittest.TestCase): - - def test_cache(self): - d = dict() - - metric = CacheMetric("cache", lambda: len(d), "cache_name") - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 0', - 'cache:total{name="cache_name"} 0', - 'cache:size{name="cache_name"} 0', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_misses() - d["key"] = "value" - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 0', - 'cache:total{name="cache_name"} 1', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_hits() - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 1', - 'cache:total{name="cache_name"} 2', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 0', - ]) - - metric.inc_evictions(2) - - self.assertEquals(metric.render(), [ - 'cache:hits{name="cache_name"} 1', - 'cache:total{name="cache_name"} 2', - 'cache:size{name="cache_name"} 1', - 'cache:evicted_size{name="cache_name"} 2', - ]) - - -class LabelValueEscapeTestCase(unittest.TestCase): - def test_simple(self): - string = "safjhsdlifhyskljfksdfh" - self.assertEqual(string, _escape_label_value(string)) - - def test_escape(self): - self.assertEqual( - "abc\\\"def\\nghi\\\\", - _escape_label_value("abc\"def\nghi\\"), - ) - - def test_sequence_of_escapes(self): - self.assertEqual( - "abc\\\"def\\nghi\\\\\\n", - _escape_label_value("abc\"def\nghi\\\n"), - ) -- cgit 1.5.1 From d8cb7225d2902ae3dd7fbcd1b3b2ebd084d81ca4 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 22 May 2018 18:09:09 +0100 Subject: daily user type phone home stats --- synapse/app/homeserver.py | 4 ++++ synapse/storage/registration.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index caccbaa814..026422a023 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -434,6 +434,10 @@ def run(hs): total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() stats["total_nonbridged_users"] = total_nonbridged_users + daily_user_type_results = yield hs.get_datastore().count_daily_user_type() + for name, count in daily_user_type_results.iteritems(): + stats["daily_user_type_" + name] = count + room_count = yield hs.get_datastore().get_room_count() stats["total_room_count"] = room_count diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a530e29f43..d8e60d2e87 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -485,6 +485,35 @@ class RegistrationStore(RegistrationWorkerStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + def count_daily_user_type(self): + """ + Counts 1) native non guest users + 2) native guests users + 3) bridged users + who registered on the homeserver in the past 24 hours + """ + def _count_daily_user_type(txn): + yesterday = int(self._clock.time()) - (60 * 60 * 24) + + sql = """ + SELECT user_type, COALESCE(count(*), 0) AS count FROM ( + SELECT + CASE + WHEN is_guest=0 AND appservice_id IS NULL THEN 'native' + WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest' + WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged' + END AS user_type + FROM users + WHERE creation_ts > ? + ) AS t GROUP BY user_type + """ + results = {'native': 0, 'guest': 0, 'bridged': 0} + txn.execute(sql, (yesterday,)) + for row in txn: + results[row[0]] = row[1] + return results + return self.runInteraction("count_daily_user_type", _count_daily_user_type) + @defer.inlineCallbacks def count_nonbridged_users(self): def _count_users(txn): -- cgit 1.5.1 From 53cc2cde1f609ec34a4ce6a7c678302c65ddfe53 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 22 May 2018 17:32:57 -0500 Subject: cleanup --- synapse/federation/send_queue.py | 5 ++--- synapse/handlers/appservice.py | 9 ++++++--- synapse/handlers/presence.py | 12 ++++++++---- synapse/http/client.py | 3 ++- synapse/http/matrixfederationclient.py | 6 ++++-- synapse/metrics/__init__.py | 28 ++++++++++++++++++++++------ synapse/push/bulk_push_rule_evaluator.py | 6 ++++-- synapse/storage/events.py | 9 ++++++--- synapse/util/metrics.py | 15 ++++++++++----- 9 files changed, 64 insertions(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e6e1888f3a..c7ed465617 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -74,9 +74,8 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), "", - lambda: len(queue), - ) + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), + "", lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index a7345331af..d9f35a5dba 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -127,12 +127,15 @@ class ApplicationServicesHandler(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_positions.labels("appservice_sender").set(upper_bound) + synapse.metrics.event_processing_positions.labels( + "appservice_sender").set(upper_bound) events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_lag.labels("appservice_sender").set(now - ts) - synapse.metrics.event_processing_last_ts.labels("appservice_sender").set(ts) + synapse.metrics.event_processing_lag.labels( + "appservice_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "appservice_sender").set(ts) finally: self.is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4ee87d5714..12939aa507 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -47,7 +47,8 @@ logger = logging.getLogger(__name__) notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") -federation_presence_out_counter = Counter("synapse_handler_presence_federation_presence_out", "") +federation_presence_out_counter = Counter( + "synapse_handler_presence_federation_presence_out", "") presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") @@ -55,8 +56,10 @@ bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) -notify_reason_counter = Counter("synapse_handler_presence_notify_reason", "", ["reason"]) -state_transition_counter = Counter("synapse_handler_presence_state_transition", "", ["from", "to"] +notify_reason_counter = Counter( + "synapse_handler_presence_notify_reason", "", ["reason"]) +state_transition_counter = Counter( + "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -213,7 +216,8 @@ class PresenceHandler(object): 60 * 1000, ) - LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], lambda: len(self.wheel_timer)) + LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], + lambda: len(self.wheel_timer)) @defer.inlineCallbacks def _on_shutdown(self): diff --git a/synapse/http/client.py b/synapse/http/client.py index 61a1d2e2b3..4d4eee3d64 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -50,7 +50,8 @@ import urllib logger = logging.getLogger(__name__) outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) -incoming_responses_counter = Counter("synapse_http_client_responses", "", ["method", "code"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", + ["method", "code"]) class SimpleHttpClient(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 259d3884e2..77eaa06a1a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -48,8 +48,10 @@ from prometheus_client import Counter logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") -outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) -incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", "", ["method", "code"]) +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", + "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", + "", ["method", "code"]) MAX_LONG_RETRIES = 10 diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 38408efb54..bed37b5f56 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -21,15 +21,14 @@ import platform import attr from prometheus_client import Gauge, Histogram, Counter -from prometheus_client.core import ( - GaugeMetricFamily, CounterMetricFamily, REGISTRY) +from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY from twisted.internet import reactor logger = logging.getLogger(__name__) -running_on_pypy = platform.python_implementation() == 'PyPy' +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] all_gauges = {} @@ -87,9 +86,16 @@ class LaterGauge(object): # gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) -gc_time = Histogram("python_gc_time", "Time taken to GC (ms)", ["gen"], buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (ms)", + ["gen"], + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], +) + class GCCounts(object): + def collect(self): gc_counts = gc.get_count() @@ -99,14 +105,23 @@ class GCCounts(object): yield cm + REGISTRY.register(GCCounts()) # # Twisted reactor metrics # -tick_time = Histogram("python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (ms)", buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000]) -pending_calls_metric = Histogram("python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000]) +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (ms)", + buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], +) # # Federation Metrics @@ -134,6 +149,7 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) + def runUntilCurrentTimer(func): @functools.wraps(func) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 6fcca5e260..b0053e7f3f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -36,8 +36,10 @@ logger = logging.getLogger(__name__) rules_by_room = {} -push_rules_invalidation_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") -push_rules_state_size_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") +push_rules_invalidation_counter = Counter( + "synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter( + "synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "") # Measures whether we use the fast path of using state deltas, or if we have to # recalculate from scratch diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 00d66886ad..b96104ccae 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -45,19 +45,22 @@ from prometheus_client import Counter logger = logging.getLogger(__name__) persist_event_counter = Counter("synapse_storage_events_persisted_events", "") -event_counter = Counter("synapse_storage_events_persisted_events_sep", "", ["type", "origin_type", "origin_entity"]) +event_counter = Counter("synapse_storage_events_persisted_events_sep", "", + ["type", "origin_type", "origin_entity"]) # The number of times we are recalculating the current state state_delta_counter = Counter("synapse_storage_events_state_delta", "") # The number of times we are recalculating state when there is only a # single forward extremity -state_delta_single_event_counter = Counter("synapse_storage_events_state_delta_single_event", "") +state_delta_single_event_counter = Counter( + "synapse_storage_events_state_delta_single_event", "") # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. -state_delta_reuse_delta_counter = Counter("synapse_storage_events_state_delta_reuse_delta", "") +state_delta_reuse_delta_counter = Counter( + "synapse_storage_events_state_delta_reuse_delta", "") def encode_json(json_object): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index a964286d85..424fdcb036 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -28,17 +28,22 @@ block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) -block_ru_utime = Counter("synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) +block_ru_utime = Counter( + "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) -block_ru_stime = Counter("synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) +block_ru_stime = Counter( + "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) -block_db_txn_count = Counter("synapse_util_metrics_block_db_txn_count", "", ["block_name"]) +block_db_txn_count = Counter( + "synapse_util_metrics_block_db_txn_count", "", ["block_name"]) # seconds spent waiting for db txns, excluding scheduling time, in this block -block_db_txn_duration = Counter("synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) +block_db_txn_duration = Counter( + "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) # seconds spent waiting for a db connection, in this block -block_db_sched_duration = Counter("synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) +block_db_sched_duration = Counter( + "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) def measure_func(name): -- cgit 1.5.1 From 095292304f04c0984813a261a7f6042b35cbd2dd Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Tue, 15 May 2018 17:52:41 +0200 Subject: Py3 storage/_base.py Signed-off-by: Adrian Tschira --- synapse/storage/_base.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..51cf7b4c6c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,6 +27,8 @@ import sys import time import threading +from six import itervalues, iterkeys, iteritems +from six.moves import intern, range logger = logging.getLogger(__name__) @@ -137,7 +139,7 @@ class PerformanceCounters(object): def interval(self, interval_duration, limit=3): counters = [] - for name, (count, cum_time) in self.current_counters.iteritems(): + for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append(( (cum_time - prev_time) / interval_duration, @@ -543,7 +545,7 @@ class SQLBaseStore(object): ", ".join("%s = ?" % (k,) for k in values), " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - sqlargs = values.values() + keyvalues.values() + sqlargs = list(values.values()) + list(keyvalues.values()) txn.execute(sql, sqlargs) if txn.rowcount > 0: @@ -561,7 +563,7 @@ class SQLBaseStore(object): ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues) ) - txn.execute(sql, allvalues.values()) + txn.execute(sql, list(allvalues.values())) # successfully inserted return True @@ -629,8 +631,8 @@ class SQLBaseStore(object): } if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - txn.execute(sql, keyvalues.values()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -694,7 +696,7 @@ class SQLBaseStore(object): table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) else: sql = "SELECT %s FROM %s" % ( ", ".join(retcols), @@ -725,9 +727,12 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) + # iterables can not be sliced, so convert it to a list first + it_list = list(iterable) + chunks = [ - iterable[i:i + batch_size] - for i in xrange(0, len(iterable), batch_size) + it_list[i:i + batch_size] + for i in range(0, len(it_list), batch_size) ] for chunk in chunks: rows = yield self.runInteraction( @@ -767,7 +772,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -790,7 +795,7 @@ class SQLBaseStore(object): @staticmethod def _simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) else: where = "" @@ -802,7 +807,7 @@ class SQLBaseStore(object): txn.execute( update_sql, - updatevalues.values() + keyvalues.values() + list(updatevalues.values()) + list(keyvalues.values()) ) return txn.rowcount @@ -850,7 +855,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - txn.execute(select_sql, keyvalues.values()) + txn.execute(select_sql, list(keyvalues.values())) row = txn.fetchone() if not row: @@ -888,7 +893,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) if txn.rowcount == 0: raise StoreError(404, "No row found") if txn.rowcount > 1: @@ -906,7 +911,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - return txn.execute(sql, keyvalues.values()) + return txn.execute(sql, list(keyvalues.values())) def _simple_delete_many(self, table, column, iterable, keyvalues, desc): return self.runInteraction( @@ -938,7 +943,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -978,7 +983,7 @@ class SQLBaseStore(object): txn.close() if cache: - min_val = min(cache.itervalues()) + min_val = min(itervalues(cache)) else: min_val = max_value @@ -1093,7 +1098,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues), " ? ASC LIMIT ? OFFSET ?" ) - txn.execute(sql, keyvalues.values() + pagevalues) + txn.execute(sql, list(keyvalues.values()) + list(pagevalues)) else: sql = "SELECT %s FROM %s ORDER BY %s" % ( ", ".join(retcols), -- cgit 1.5.1 From 6c16a4ec1b6bc1aa3ef3bbcc080419e1b4bce5bd Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Tue, 15 May 2018 17:45:01 +0200 Subject: more iteritems --- synapse/storage/roommember.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6a861943a2..7bfc3d91b5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -30,6 +30,8 @@ from synapse.types import get_domain_from_id import logging import simplejson as json +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = {} member_event_ids = [ e_id - for key, e_id in current_state_ids.iteritems() + for key, e_id in iteritems(current_state_ids) if key[0] == EventTypes.Member ] @@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = dict(prev_res) member_event_ids = [ e_id - for key, e_id in context.delta_ids.iteritems() + for key, e_id in iteritems(context.delta_ids) if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: @@ -741,7 +743,7 @@ class _JoinedHostsCache(object): if state_entry.state_group == self.state_group: pass elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + for (typ, state_key), event_id in iteritems(state_entry.delta_ids): if typ != EventTypes.Member: continue @@ -771,7 +773,7 @@ class _JoinedHostsCache(object): self.state_group = state_entry.state_group else: self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users)) defer.returnValue(frozenset(self.hosts_to_joined_users)) def __len__(self): -- cgit 1.5.1 From 17a70cf6e9ae016011972b4d8a97aca0c3d945d4 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 5 May 2018 22:47:18 +0200 Subject: Misc. py3 fixes Signed-off-by: Adrian Tschira --- synapse/handlers/federation.py | 9 +++++---- synapse/handlers/message.py | 9 +++++---- synapse/storage/events_worker.py | 2 +- synapse/storage/filtering.py | 2 +- synapse/storage/keys.py | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2c72beff2e..87c0615820 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -24,6 +24,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json import six from six.moves import http_client +from six import iteritems from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -1388,7 +1389,7 @@ class FederationHandler(BaseHandler): ) if state_groups: - _, state = state_groups.items().pop() + _, state = list(iteritems(state_groups)).pop() results = { (e.type, e.state_key): e for e in state } @@ -2034,7 +2035,7 @@ class FederationHandler(BaseHandler): this will not be included in the current_state in the context. """ state_updates = { - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) if k != event_key } context.current_state_ids = dict(context.current_state_ids) @@ -2044,7 +2045,7 @@ class FederationHandler(BaseHandler): context.delta_ids.update(state_updates) context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) }) context.state_group = yield self.store.store_state_group( event.event_id, @@ -2096,7 +2097,7 @@ class FederationHandler(BaseHandler): def get_next(it, opt=None): try: - return it.next() + return next(it) except Exception: return opt diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c32b9bcae4..81cff0870e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,7 @@ import sys from canonicaljson import encode_canonical_json import six +from six import string_types, itervalues, iteritems from twisted.internet import defer, reactor from twisted.internet.defer import succeed from twisted.python.failure import Failure @@ -402,7 +403,7 @@ class MessageHandler(BaseHandler): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) }) @@ -667,7 +668,7 @@ class EventCreationHandler(object): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, basestring): + if not isinstance(spam_error, string_types): spam_error = "Spam is not permitted here" raise SynapseError( 403, spam_error, Codes.FORBIDDEN @@ -881,7 +882,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.iteritems() + for k, e_id in iteritems(context.current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -895,7 +896,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in state_to_include.itervalues() + for e in itervalues(state_to_include) ] invitee = UserID.from_string(event.state_key) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index ba834854e1..32d9d00ffb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -337,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore): def _fetch_event_rows(self, txn, events): rows = [] N = 200 - for i in range(1 + len(events) / N): + for i in range(1 + len(events) // N): evs = events[i * N:(i + 1) * N] if not evs: break diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 78b1e30945..2e2763126d 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore): desc="get_user_filter", ) - defer.returnValue(json.loads(str(def_json).decode("utf-8"))) + defer.returnValue(json.loads(bytes(def_json).decode("utf-8"))) def add_user_filter(self, user_localpart, user_filter): def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 87aeaf71d6..0540c2b0b1 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -92,7 +92,7 @@ class KeyStore(SQLBaseStore): if verify_key_bytes: defer.returnValue(decode_verify_key_bytes( - key_id, str(verify_key_bytes) + key_id, bytes(verify_key_bytes) )) @defer.inlineCallbacks -- cgit 1.5.1 From dd068ca9792e7f4a51690b91262131b5dae80455 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Thu, 24 May 2018 20:52:56 +0200 Subject: remaining isintance fixes Signed-off-by: Adrian Tschira --- synapse/storage/search.py | 5 +++-- synapse/util/caches/descriptors.py | 12 ++++++++---- synapse/util/frozenutils.py | 5 +++-- 3 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 6ba3e59889..a9c299a861 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -18,13 +18,14 @@ import logging import re import simplejson as json +from six import string_types + from twisted.internet import defer from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine - logger = logging.getLogger(__name__) SearchEntry = namedtuple('SearchEntry', [ @@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore): # skip over it. continue - if not isinstance(value, basestring): + if not isinstance(value, string_types): # If the event body, name or topic isn't a string # then skip over it continue diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 68285a7594..b595cd6164 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -31,6 +31,9 @@ import functools import inspect import threading +from six import string_types, itervalues +import six + logger = logging.getLogger(__name__) @@ -205,7 +208,7 @@ class Cache(object): def invalidate_all(self): self.check_thread() self.cache.clear() - for entry in self._pending_deferred_cache.itervalues(): + for entry in itervalues(self._pending_deferred_cache): entry.invalidate() self._pending_deferred_cache.clear() @@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase): ret.addErrback(onErr) - # If our cache_key is a string, try to convert to ascii to save - # a bit of space in large caches - if isinstance(cache_key, basestring): + # If our cache_key is a string on py2, try to convert to ascii + # to save a bit of space in large caches. Py3 does this + # internally automatically. + if six.PY2 and isinstance(cache_key, string_types): cache_key = to_ascii(cache_key) result_d = ObservableDeferred(ret, consumeErrors=True) diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index f497b51f4a..4cd0566f4f 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -16,6 +16,7 @@ from frozendict import frozendict import simplejson as json +from six import string_types def freeze(o): t = type(o) @@ -25,7 +26,7 @@ def freeze(o): if t is frozendict: return o - if t is str or t is unicode: + if isinstance(t, string_types): return o try: @@ -41,7 +42,7 @@ def unfreeze(o): if t is dict or t is frozendict: return dict({k: unfreeze(v) for k, v in o.items()}) - if t is str or t is unicode: + if isinstance(t, string_types): return o try: -- cgit 1.5.1 From a2eb5db4a066148da5057b8ee844c7e4f5d888bc Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 28 May 2018 19:10:27 +1000 Subject: update metrics to be in seconds --- synapse/http/request_metrics.py | 6 +++--- synapse/http/site.py | 20 ++++++++++---------- synapse/metrics/__init__.py | 19 ++++++++++--------- synapse/storage/_base.py | 30 +++++++++++++++--------------- 4 files changed, 38 insertions(+), 37 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index e7df494333..af3067b4bb 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -38,15 +38,15 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"] + "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] ) response_ru_utime = Counter( - "synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"] + "synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"] ) response_ru_stime = Counter( - "synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"] + "synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"] ) response_db_txn_count = Counter( diff --git a/synapse/http/site.py b/synapse/http/site.py index 23c1b76922..60299657b9 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -83,7 +83,7 @@ class SynapseRequest(Request): return Request.render(self, resrc) def _started_processing(self, servlet_name): - self.start_time = int(time.time() * 1000) + self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( self.start_time, name=servlet_name, method=self.method, @@ -102,26 +102,26 @@ class SynapseRequest(Request): context = LoggingContext.current_context() ru_utime, ru_stime = context.get_resource_usage() db_txn_count = context.db_txn_count - db_txn_duration_ms = context.db_txn_duration_ms - db_sched_duration_ms = context.db_sched_duration_ms + db_txn_duration_sec = context.db_txn_duration_sec + db_sched_duration_sec = context.db_sched_duration_sec except Exception: ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration_ms = (0, 0) + db_txn_count, db_txn_duration_sec = (0, 0) - end_time = int(time.time() * 1000) + end_time = time.time() self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)" + " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, self.authenticated_entity, end_time - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - db_sched_duration_ms, - db_txn_duration_ms, + ru_utime, + ru_stime, + db_sched_duration_sec, + db_txn_duration_sec, int(db_txn_count), self.sentLength, self.code, diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e33ed6c9be..0c557a43f6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -122,10 +122,10 @@ REGISTRY.register(CPUMetrics()) gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) gc_time = Histogram( "python_gc_time", - "Time taken to GC (ms)", + "Time taken to GC (sec)", ["gen"], - buckets=[2.5, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 7500, 15000, - 30000, 45000, 60000], + buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, + 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], ) @@ -147,8 +147,9 @@ REGISTRY.register(GCCounts()) tick_time = Histogram( "python_twisted_reactor_tick_time", - "Tick time of the Twisted reactor (ms)", - buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000], + "Tick time of the Twisted reactor (sec)", + buckets=[0.001, 0.002, 0.005, 0.001, 0.005, 0.01. 0.025, 0.05, 0.1, 0.2, + 0.5, 1, 2, 5], ) pending_calls_metric = Histogram( "python_twisted_reactor_pending_calls", @@ -202,9 +203,9 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - start = time.time() * 1000 + start = time.time() ret = func(*args, **kwargs) - end = time.time() * 1000 + end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, @@ -225,9 +226,9 @@ def runUntilCurrentTimer(func): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) - start = time.time() * 1000 + start = time.time() unreachable = gc.collect(i) - end = time.time() * 1000 + end = time.time() gc_time.labels(i).observe(end - start) gc_unreachable.labels(i).set(unreachable) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d963af5c89..22d6257a9f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -42,10 +42,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") -sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "") +sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec") -sql_query_timer = Histogram("synapse_storage_query_time", "", ["verb"]) -sql_txn_timer = Histogram("synapse_storage_transaction_time", "", ["desc"]) +sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) +sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"]) class LoggingTransaction(object): @@ -110,7 +110,7 @@ class LoggingTransaction(object): # Don't let logging failures stop SQL from working pass - start = time.time() * 1000 + start = time.time() try: return func( @@ -120,9 +120,9 @@ class LoggingTransaction(object): logger.debug("[SQL FAIL] {%s} %s", self.name, e) raise finally: - msecs = (time.time() * 1000) - start - sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) - sql_query_timer.labels(sql.split()[0]).observe(msecs) + secs = time.time() - start + sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) + sql_query_timer.labels(sql.split()[0]).observe(secs) class PerformanceCounters(object): @@ -132,7 +132,7 @@ class PerformanceCounters(object): def update(self, key, start_time, end_time=None): if end_time is None: - end_time = time.time() * 1000 + end_time = time.time() duration = end_time - start_time count, cum_time = self.current_counters.get(key, (0, 0)) count += 1 @@ -222,7 +222,7 @@ class SQLBaseStore(object): def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks, logging_context, func, *args, **kwargs): - start = time.time() * 1000 + start = time.time() txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from @@ -282,13 +282,13 @@ class SQLBaseStore(object): logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: - end = time.time() * 1000 + end = time.time() duration = end - start if logging_context is not None: logging_context.add_database_transaction(duration) - transaction_logger.debug("[TXN END] {%s} %f", name, duration) + transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) @@ -349,13 +349,13 @@ class SQLBaseStore(object): """ current_context = LoggingContext.current_context() - start_time = time.time() * 1000 + start_time = time.time() def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection") as context: - sched_duration_ms = time.time() * 1000 - start_time - sql_scheduling_timer.observe(sched_duration_ms) - current_context.add_database_scheduled(sched_duration_ms) + sched_duration_sec = time.time() - start_time + sql_scheduling_timer.observe(sched_duration_sec) + current_context.add_database_scheduled(sched_duration_sec) if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") -- cgit 1.5.1 From 1afafb34978726b63c2a02520ff20d85382ce2bc Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Tue, 29 May 2018 17:10:06 +0200 Subject: use memoryview in py3 Signed-off-by: Adrian Tschira --- synapse/storage/keys.py | 8 ++++++++ synapse/storage/signatures.py | 12 ++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 0540c2b0b1..e56c464852 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer +import six import OpenSSL from signedjson.key import decode_verify_key_bytes @@ -26,6 +27,13 @@ import logging logger = logging.getLogger(__name__) +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 9e6eaaa532..25922e5a9c 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer +import six from ._base import SQLBaseStore @@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash from synapse.util.caches.descriptors import cached, cachedList +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class SignatureWorkerStore(SQLBaseStore): @cached() @@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore): for e_id, h in hashes.items() } - defer.returnValue(hashes.items()) + defer.returnValue(list(hashes.items())) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. @@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore): vals.append({ "event_id": event.event_id, "algorithm": ref_alg, - "hash": buffer(ref_hash_bytes), + "hash": db_binary_type(ref_hash_bytes), }) self._simple_insert_many_txn( -- cgit 1.5.1 From 4b9d0cde97495cee16f650f7625c89339a24230c Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Tue, 29 May 2018 17:42:43 +0200 Subject: add remaining memoryview changes --- synapse/storage/keys.py | 6 +++--- synapse/storage/transactions.py | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index e56c464852..0f13b61da8 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -80,7 +80,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "tls_certificate": buffer(tls_certificate_bytes), + "tls_certificate": db_binary_type(tls_certificate_bytes), }, desc="store_server_certificate", ) @@ -143,7 +143,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "verify_key": buffer(verify_key.encode()), + "verify_key": db_binary_type(verify_key.encode()), }, ) txn.call_after( @@ -180,7 +180,7 @@ class KeyStore(SQLBaseStore): "from_server": from_server, "ts_added_ms": ts_now_ms, "ts_valid_until_ms": ts_expires_ms, - "key_json": buffer(key_json_bytes), + "key_json": db_binary_type(key_json_bytes), }, desc="store_server_keys_json", ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index f825264ea9..e485d19b84 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer +import six from canonicaljson import encode_canonical_json @@ -25,6 +26,13 @@ from collections import namedtuple import logging import simplejson as json +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + logger = logging.getLogger(__name__) @@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore): "transaction_id": transaction_id, "origin": origin, "response_code": code, - "response_json": buffer(encode_canonical_json(response_dict)), + "response_json": db_binary_type(encode_canonical_json(response_dict)), "ts": self._clock.time_msec(), }, or_ignore=True, -- cgit 1.5.1 From c379acd4fd07c41e8181af508093a50e1d894f92 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 29 May 2018 17:47:28 +0100 Subject: bump version --- synapse/storage/prepare_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c08e9cd65a..cf2aae0468 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 49 +SCHEMA_VERSION = 50 dir_path = os.path.abspath(os.path.dirname(__file__)) -- cgit 1.5.1 From 558f3d376a5a6f111bfd277528a510c8157aa99c Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 29 May 2018 17:47:55 +0100 Subject: create index in background --- synapse/storage/registration.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d8e60d2e87..c0bf5e9ed6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,13 @@ class RegistrationStore(RegistrationWorkerStore, columns=["user_id", "device_id"], ) + self.register_background_index_update( + "users_creation_ts", + index_name="users_creation_ts", + table="users", + columns=["creation_ts"], + ) + # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. -- cgit 1.5.1 From ab0ef31dc72313b05ded0e2a3427d278a58ea92d Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 29 May 2018 17:51:08 +0100 Subject: create users index on creation_ts --- .../schema/delta/50/add_creation_ts_users_index.sql | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 synapse/storage/schema/delta/50/add_creation_ts_users_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql new file mode 100644 index 0000000000..ba33acfd51 --- /dev/null +++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql @@ -0,0 +1,20 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +CREATE INDEX users_creation_ts ON users(creation_ts); +INSERT into background_updates (update_name, progress_json) + VALUES ('users_creation_ts', '{}'); -- cgit 1.5.1 From 4a9cbdbc1543378a5f2a5bfde825c3886deaadf5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 May 2018 19:54:32 +0100 Subject: Exempt AS-registered users from doing gdpr --- synapse/handlers/message.py | 3 +++ synapse/storage/registration.py | 1 + 2 files changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81cff0870e..1cb81b6cf8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -572,6 +572,9 @@ class EventCreationHandler(object): u = yield self.store.get_user_by_id(user_id) assert u is not None + if u["appservice_id"] is not None: + # users registered by an appservice are exempt + return if u["consent_version"] == self.config.user_consent_version: return diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a530e29f43..40f7cc16ee 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore): retcols=[ "name", "password_hash", "is_guest", "consent_version", "consent_server_notice_sent", + "appservice_id", ], allow_none=True, desc="get_user_by_id", -- cgit 1.5.1 From c936a52a9eb18e302fbd5158da7188f674912530 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 31 May 2018 19:03:47 +1000 Subject: Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (#3307) --- synapse/api/auth.py | 4 ++- synapse/api/filtering.py | 2 +- synapse/event_auth.py | 4 +-- synapse/events/__init__.py | 2 +- synapse/federation/federation_client.py | 4 +-- synapse/federation/send_queue.py | 2 +- synapse/federation/transaction_queue.py | 6 +++-- synapse/handlers/_base.py | 4 +-- synapse/handlers/appservice.py | 4 ++- synapse/handlers/auth.py | 6 ++--- synapse/handlers/device.py | 4 +-- synapse/handlers/federation.py | 17 ++++++------ synapse/handlers/presence.py | 12 ++++----- synapse/handlers/room.py | 2 +- synapse/handlers/room_list.py | 3 ++- synapse/handlers/search.py | 2 +- synapse/handlers/sync.py | 6 ++--- synapse/push/baserules.py | 2 +- synapse/push/mailer.py | 3 ++- synapse/push/presentable_names.py | 2 +- synapse/rest/client/transactions.py | 2 +- synapse/state.py | 3 ++- synapse/storage/events.py | 47 ++++++++++++++++++--------------- synapse/storage/presence.py | 7 ++--- synapse/storage/search.py | 4 +-- synapse/storage/state.py | 47 ++++++++++++++++++--------------- synapse/storage/user_directory.py | 8 +++--- synapse/util/caches/descriptors.py | 2 +- synapse/util/caches/treecache.py | 6 +++-- 29 files changed, 116 insertions(+), 101 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index b052cf532b..06fa38366d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -15,6 +15,8 @@ import logging +from six import itervalues + import pymacaroons from twisted.internet import defer @@ -66,7 +68,7 @@ class Auth(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.values() + (e.type, e.state_key): e for e in itervalues(auth_events) } self.check(event, auth_events=auth_events, do_sig_check=do_sig_check) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index db43219d24..dbc0e7e445 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -411,7 +411,7 @@ class Filter(object): return room_ids def filter(self, events): - return filter(self.check, events) + return list(filter(self.check, events)) def limit(self): return self.filter_json.get("limit", 10) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index cd5627e36a..eaf9cecde6 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -471,14 +471,14 @@ def _check_power_levels(event, auth_events): ] old_list = current_state.content.get("users", {}) - for user in set(old_list.keys() + user_list.keys()): + for user in set(list(old_list) + list(user_list)): levels_to_check.append( (user, "users") ) old_list = current_state.content.get("events", {}) new_list = event.content.get("events", {}) - for ev_id in set(old_list.keys() + new_list.keys()): + for ev_id in set(list(old_list) + list(new_list)): levels_to_check.append( (ev_id, "events") ) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c3ff85c49a..cb08da4984 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -146,7 +146,7 @@ class EventBase(object): return field in self._event_dict def items(self): - return self._event_dict.items() + return list(self._event_dict.items()) class FrozenEvent(EventBase): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2761ffae07..87a92f6ea9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -391,7 +391,7 @@ class FederationClient(FederationBase): """ if return_local: seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = seen_events.values() + signed_events = list(seen_events.values()) else: seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] @@ -589,7 +589,7 @@ class FederationClient(FederationBase): } valid_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus.values(), + destination, list(pdus.values()), outlier=True, ) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index c7ed465617..3dcc629d44 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -197,7 +197,7 @@ class FederationRemoteSendQueue(object): # We only want to send presence for our own users, so lets always just # filter here just in case. - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states)) self.presence_map.update({state.user_id: state for state in local_states}) self.presence_changed[pos] = [state.user_id for state in local_states] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 69312ec233..f0aeb5a0d3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -35,6 +35,8 @@ from synapse.metrics import ( from prometheus_client import Counter +from six import itervalues + import logging @@ -234,7 +236,7 @@ class TransactionQueue(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ logcontext.run_in_background(handle_room_events, evs) - for evs in events_by_room.itervalues() + for evs in itervalues(events_by_room) ], consumeErrors=True )) @@ -325,7 +327,7 @@ class TransactionQueue(object): if not states_map: break - yield self._process_presence_inner(states_map.values()) + yield self._process_presence_inner(list(states_map.values())) except Exception: logger.exception("Error sending presence states to servers") finally: diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e089e66fde..2d1db0c245 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -114,14 +114,14 @@ class BaseHandler(object): if guest_access != "can_join": if context: current_state = yield self.store.get_events( - context.current_state_ids.values() + list(context.current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( event.room_id ) - current_state = current_state.values() + current_state = list(current_state.values()) logger.info("maybe_kick_guest_users %r", current_state) yield self.kick_guest_users(current_state) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d9f35a5dba..1c29c43a83 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from six import itervalues + import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure @@ -119,7 +121,7 @@ class ApplicationServicesHandler(object): yield make_deferred_yieldable(defer.gatherResults([ run_in_background(handle_room_events, evs) - for evs in events_by_room.itervalues() + for evs in itervalues(events_by_room) ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a5365c4fe4..3c0051586d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -249,7 +249,7 @@ class AuthHandler(BaseHandler): errordict = e.error_dict() for f in flows: - if len(set(f) - set(creds.keys())) == 0: + if len(set(f) - set(creds)) == 0: # it's very useful to know what args are stored, but this can # include the password in the case of registering, so only log # the keys (confusingly, clientdict may contain a password @@ -257,12 +257,12 @@ class AuthHandler(BaseHandler): # and is not sensitive). logger.info( "Auth completed with creds: %r. Client dict has keys: %r", - creds, clientdict.keys() + creds, list(clientdict) ) defer.returnValue((creds, clientdict, session['id'])) ret = self._auth_dict_for_flows(flows, session) - ret['completed'] = creds.keys() + ret['completed'] = list(creds) ret.update(errordict) raise InteractiveAuthIncompleteError( ret, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 31bd0e60c6..11c6fb3657 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -114,7 +114,7 @@ class DeviceHandler(BaseHandler): user_id, device_id=None ) - devices = device_map.values() + devices = list(device_map.values()) for device in devices: _update_device_from_client_ips(device, ips) @@ -187,7 +187,7 @@ class DeviceHandler(BaseHandler): defer.Deferred: """ device_map = yield self.store.get_devices_by_user(user_id) - device_ids = device_map.keys() + device_ids = list(device_map) if except_device_id is not None: device_ids = [d for d in device_ids if d != except_device_id] yield self.delete_devices(user_id, device_ids) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 87c0615820..fcf94befb7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -52,7 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room - logger = logging.getLogger(__name__) @@ -480,8 +479,8 @@ class FederationHandler(BaseHandler): # to get all state ids that we're interested in. event_map = yield self.store.get_events([ e_id - for key_to_eid in event_to_state_ids.itervalues() - for key, e_id in key_to_eid.iteritems() + for key_to_eid in list(event_to_state_ids.values()) + for key, e_id in key_to_eid.items() if key[0] != EventTypes.Member or check_match(key[1]) ]) @@ -1149,13 +1148,13 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = context.prev_state_ids.values() + state_ids = list(context.prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(context.prev_state_ids.values()) + state = yield self.store.get_events(list(context.prev_state_ids.values())) defer.returnValue({ - "state": state.values(), + "state": list(state.values()), "auth_chain": auth_chain, }) @@ -1405,7 +1404,7 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - res = results.values() + res = list(results.values()) for event in res: # We sign these again because there was a bug where we # incorrectly signed things the first time round @@ -1446,7 +1445,7 @@ class FederationHandler(BaseHandler): else: results.pop((event.type, event.state_key), None) - defer.returnValue(results.values()) + defer.returnValue(list(results.values())) else: defer.returnValue([]) @@ -1915,7 +1914,7 @@ class FederationHandler(BaseHandler): }) new_state = self.state_handler.resolve_events( - [local_view.values(), remote_view.values()], + [list(local_view.values()), list(remote_view.values())], event ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 26fc0d3ec7..7fe568132f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -325,7 +325,7 @@ class PresenceHandler(object): if to_notify: notified_presence_counter.inc(len(to_notify)) - yield self._persist_and_notify(to_notify.values()) + yield self._persist_and_notify(list(to_notify.values())) self.unpersisted_users_changes |= set(s.user_id for s in new_states) self.unpersisted_users_changes -= set(to_notify.keys()) @@ -687,7 +687,7 @@ class PresenceHandler(object): """ updates = yield self.current_state_for_users(target_user_ids) - updates = updates.values() + updates = list(updates.values()) for user_id in set(target_user_ids) - set(u.user_id for u in updates): updates.append(UserPresenceState.default(user_id)) @@ -753,11 +753,11 @@ class PresenceHandler(object): self._push_to_remotes([state]) else: user_ids = yield self.store.get_users_in_room(room_id) - user_ids = filter(self.is_mine_id, user_ids) + user_ids = list(filter(self.is_mine_id, user_ids)) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes(states.values()) + self._push_to_remotes(list(states.values())) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -1051,7 +1051,7 @@ class PresenceEventSource(object): updates = yield presence.current_state_for_users(user_ids_changed) if include_offline: - defer.returnValue((updates.values(), max_token)) + defer.returnValue((list(updates.values()), max_token)) else: defer.returnValue(([ s for s in itervalues(updates) @@ -1112,7 +1112,7 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): if new_state: changes[state.user_id] = new_state - return changes.values() + return list(changes.values()) def handle_timeout(state, is_mine, syncing_user_ids, now): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b5850db42f..2abd63ad05 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -455,7 +455,7 @@ class RoomContextHandler(BaseHandler): state = yield self.store.get_state_for_events( [last_event_id], None ) - results["state"] = state[last_event_id].values() + results["state"] = list(state[last_event_id].values()) results["start"] = now_token.copy_and_replace( "room_key", results["start"] diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5757bb7f8a..fc507cef36 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from six import iteritems from six.moves import range from ._base import BaseHandler @@ -307,7 +308,7 @@ class RoomListHandler(BaseHandler): ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.iteritems() + event_id for key, event_id in iteritems(current_state_ids) if key[0] in ( EventTypes.JoinRules, EventTypes.Name, diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9772ed1a0e..1eca26aa1e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -348,7 +348,7 @@ class SearchHandler(BaseHandler): rooms = set(e.room_id for e in allowed_events) for room_id in rooms: state = yield self.state_handler.get_current_state(room_id) - state_results[room_id] = state.values() + state_results[room_id] = list(state.values()) state_results.values() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8377650b68..51ec727df0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -541,11 +541,11 @@ class SyncHandler(object): state = {} if state_ids: - state = yield self.store.get_events(state_ids.values()) + state = yield self.store.get_events(list(state_ids.values())) defer.returnValue({ (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(state.values()) + for e in sync_config.filter_collection.filter_room_state(list(state.values())) }) @defer.inlineCallbacks @@ -894,7 +894,7 @@ class SyncHandler(object): presence.extend(states) # Deduplicate the presence entries so that there's at most one per user - presence = {p.user_id: p for p in presence}.values() + presence = list({p.user_id: p for p in presence}.values()) presence = sync_config.filter_collection.filter_presence( presence diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 7a18afe5f9..a8ae7bcd6c 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -39,7 +39,7 @@ def list_with_base_rules(rawrules): rawrules = [r for r in rawrules if r['priority_class'] >= 0] # shove the server default rules for each kind onto the end of each - current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] + current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1] ruleslist.extend(make_base_prepend_rules( PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b5cd9b426a..d4be800e5e 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -229,7 +229,8 @@ class Mailer(object): if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]: prev_messages = room_vars['notifs'][-1]['messages'] for message in notifvars['messages']: - pm = filter(lambda pm: pm['id'] == message['id'], prev_messages) + pm = list(filter(lambda pm: pm['id'] == message['id'], + prev_messages)) if pm: if not message["is_historical"]: pm[0]["is_historical"] = False diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index 277da3cd35..43f0c74ff3 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True # so find out who is in the room that isn't the user. if "m.room.member" in room_state_bytype_ids: member_events = yield store.get_events( - room_state_bytype_ids["m.room.member"].values() + list(room_state_bytype_ids["m.room.member"].values()) ) all_members = [ ev for ev in member_events.values() diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 20fa6678ef..7c01b438cb 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -104,7 +104,7 @@ class HttpTransactionCache(object): def _cleanup(self): now = self.clock.time_msec() - for key in self.transactions.keys(): + for key in list(self.transactions): ts = self.transactions[key][1] if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period del self.transactions[key] diff --git a/synapse/state.py b/synapse/state.py index b8c27c6815..216418f58d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -132,7 +132,8 @@ class StateHandler(object): defer.returnValue(event) return - state_map = yield self.store.get_events(state.values(), get_prev_content=False) + state_map = yield self.store.get_events(list(state.values()), + get_prev_content=False) state = { key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map } diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b96104ccae..cb1082e864 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -40,6 +40,9 @@ import synapse.metrics from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from six.moves import range +from six import itervalues, iteritems + from prometheus_client import Counter logger = logging.getLogger(__name__) @@ -245,7 +248,7 @@ class EventsStore(EventsWorkerStore): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in partitioned.iteritems(): + for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, @@ -330,7 +333,7 @@ class EventsStore(EventsWorkerStore): chunks = [ events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) + for x in range(0, len(events_and_contexts), 100) ] for chunk in chunks: @@ -364,7 +367,7 @@ class EventsStore(EventsWorkerStore): (event, context) ) - for room_id, ev_ctx_rm in events_by_room.iteritems(): + for room_id, ev_ctx_rm in iteritems(events_by_room): # Work out new extremities by recursively adding and removing # the new events. latest_event_ids = yield self.get_latest_event_ids_in_room( @@ -459,12 +462,12 @@ class EventsStore(EventsWorkerStore): event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in current_state_for_room.iteritems(): + for room_id, new_state in iteritems(current_state_for_room): self.get_current_state_ids.prefill( (room_id, ), new_state ) - for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -641,20 +644,20 @@ class EventsStore(EventsWorkerStore): """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(existing_state.itervalues()) - new_events = set(ev_id for ev_id in current_state.itervalues()) + existing_events = set(itervalues(existing_state)) + new_events = set(ev_id for ev_id in itervalues(current_state)) changed_events = existing_events ^ new_events if not changed_events: return to_delete = { - key: ev_id for key, ev_id in existing_state.iteritems() + key: ev_id for key, ev_id in iteritems(existing_state) if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { - key: ev_id for key, ev_id in current_state.iteritems() + key: ev_id for key, ev_id in iteritems(current_state) if ev_id in events_to_insert } @@ -757,11 +760,11 @@ class EventsStore(EventsWorkerStore): ) def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): - for room_id, current_state_tuple in state_delta_by_room.iteritems(): + for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in to_delete.itervalues()], + [(ev_id,) for ev_id in itervalues(to_delete)], ) self._simple_insert_many_txn( @@ -774,7 +777,7 @@ class EventsStore(EventsWorkerStore): "type": key[0], "state_key": key[1], } - for key, ev_id in to_insert.iteritems() + for key, ev_id in iteritems(to_insert) ], ) @@ -793,7 +796,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "prev_event_id": to_delete.get(key, None), } - for key, ev_id in state_deltas.iteritems() + for key, ev_id in iteritems(state_deltas) ] ) @@ -836,7 +839,7 @@ class EventsStore(EventsWorkerStore): def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): - for room_id, new_extrem in new_forward_extremities.iteritems(): + for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -854,7 +857,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], ) @@ -871,7 +874,7 @@ class EventsStore(EventsWorkerStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem ] ) @@ -899,7 +902,7 @@ class EventsStore(EventsWorkerStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) - return new_events_and_contexts.values() + return list(new_events_and_contexts.values()) def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): """Update min_depth for each room @@ -925,7 +928,7 @@ class EventsStore(EventsWorkerStore): event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in depth_updates.iteritems(): + for room_id, depth in iteritems(depth_updates): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -1309,7 +1312,7 @@ class EventsStore(EventsWorkerStore): " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(ev_map)),) - txn.execute(sql, ev_map.keys()) + txn.execute(sql, list(ev_map)) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -1572,7 +1575,7 @@ class EventsStore(EventsWorkerStore): chunks = [ event_ids[i:i + 100] - for i in xrange(0, len(event_ids), 100) + for i in range(0, len(event_ids), 100) ] for chunk in chunks: ev_rows = self._simple_select_many_txn( @@ -1986,7 +1989,7 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] - for i in xrange(0, len(state_rows), 100): + for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about # to delete @@ -2042,7 +2045,7 @@ class EventsStore(EventsWorkerStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in curr_state.iteritems() + for key, state_id in iteritems(curr_state) ], ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9e9d3c2591..f05d91cc58 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.api.constants import PresenceState from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util import batch_iter from collections import namedtuple from twisted.internet import defer @@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore): " AND user_id IN (%s)" ) - batches = ( - presence_states[i:i + 50] - for i in xrange(0, len(presence_states), 50) - ) - for states in batches: + for states in batch_iter(presence_states, 50): args = [stream_id] args.extend(s.user_id for s in states) txn.execute( diff --git a/synapse/storage/search.py b/synapse/storage/search.py index a9c299a861..f0fa5d7631 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -448,7 +448,7 @@ class SearchStore(BackgroundUpdateStore): "search_msgs", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) @@ -603,7 +603,7 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ffa4246031..bdee14a8eb 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,9 @@ from collections import namedtuple import logging +from six import iteritems, itervalues +from six.moves import range + from twisted.internet import defer from synapse.storage.background_updates import BackgroundUpdateStore @@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups) defer.returnValue(group_to_state) @@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore): state_event_map = yield self.get_events( [ - ev_id for group_ids in group_to_ids.itervalues() - for ev_id in group_ids.itervalues() + ev_id for group_ids in itervalues(group_to_ids) + for ev_id in itervalues(group_ids) ], get_prev_content=False ) defer.returnValue({ group: [ - state_event_map[v] for v in event_id_map.itervalues() + state_event_map[v] for v in itervalues(event_id_map) if v in state_event_map ] - for group, event_id_map in group_to_ids.iteritems() + for group, event_id_map in iteritems(group_to_ids) }) @defer.inlineCallbacks @@ -186,7 +189,7 @@ class StateGroupWorkerStore(SQLBaseStore): """ results = {} - chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] + chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", @@ -347,21 +350,21 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) state_event_map = yield self.get_events( - [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()], + [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], get_prev_content=False ) event_to_state = { event_id: { k: state_event_map[v] - for k, v in group_to_state[group].iteritems() + for k, v in iteritems(group_to_state[group]) if v in state_event_map } - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -384,12 +387,12 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) event_to_state = { event_id: group_to_state[group] - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -503,7 +506,7 @@ class StateGroupWorkerStore(SQLBaseStore): got_all = is_all or not missing_types return { - k: v for k, v in state_dict_ids.iteritems() + k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) }, missing_types, got_all @@ -562,12 +565,12 @@ class StateGroupWorkerStore(SQLBaseStore): # Now we want to update the cache with all the things we fetched # from the database. - for group, group_state_dict in group_to_state_dict.iteritems(): + for group, group_state_dict in iteritems(group_to_state_dict): state_dict = results[group] state_dict.update( ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) - for k, v in group_state_dict.iteritems() + for k, v in iteritems(group_state_dict) ) self._state_group_cache.update( @@ -654,7 +657,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_ids.iteritems() + for key, state_id in iteritems(delta_ids) ], ) else: @@ -669,7 +672,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in current_state_ids.iteritems() + for key, state_id in iteritems(current_state_ids) ], ) @@ -794,11 +797,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_group": state_group_id, "event_id": event_id, } - for event_id, state_group_id in state_groups.iteritems() + for event_id, state_group_id in iteritems(state_groups) ], ) - for event_id, state_group_id in state_groups.iteritems(): + for event_id, state_group_id in iteritems(state_groups): txn.call_after( self._get_state_group_for_event.prefill, (event_id,), state_group_id @@ -826,7 +829,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): def reindex_txn(txn): new_last_state_group = last_state_group - for count in xrange(batch_size): + for count in range(batch_size): txn.execute( "SELECT id, room_id FROM state_groups" " WHERE ? < id AND id <= ?" @@ -884,7 +887,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): # of keys delta_state = { - key: value for key, value in curr_state.iteritems() + key: value for key, value in iteritems(curr_state) if prev_state.get(key, None) != value } @@ -924,7 +927,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_state.iteritems() + for key, state_id in iteritems(delta_state) ], ) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d6e289ffbe..275c299998 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +from six import iteritems + import re import logging @@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), profile.display_name, ) - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ @@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) - for user_id, p in users_with_profile.iteritems() + for user_id, p in iteritems(users_with_profile) ) else: # This should be unreachable. @@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore): "display_name": profile.display_name, "avatar_url": profile.avatar_url, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ] ) for user_id in users_with_profile: diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index f4e2c30088..fc1874b65b 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -569,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase): return results return logcontext.make_deferred_yieldable(defer.gatherResults( - cached_defers.values(), + list(cached_defers.values()), consumeErrors=True, ).addCallback(update_results_dict).addErrback( unwrapFirstError diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index fcc341a6b7..dd4c9e6067 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from six import itervalues + SENTINEL = object() @@ -49,7 +51,7 @@ class TreeCache(object): if popped is SENTINEL: return default - node_and_keys = zip(nodes, key) + node_and_keys = list(zip(nodes, key)) node_and_keys.reverse() node_and_keys.append((self.root, None)) @@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d): can contain dicts. """ if isinstance(d, dict): - for value_d in d.itervalues(): + for value_d in itervalues(d): for value in iterate_tree_cache_entry(value_d): yield value else: -- cgit 1.5.1 From 4986b084f8e81e7697ea2022d306de03f8b0263e Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Fri, 1 Jun 2018 10:50:40 +0100 Subject: remove unnecessary INSERT --- synapse/storage/schema/delta/50/add_creation_ts_users_index.sql | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql index ba33acfd51..c93ae47532 100644 --- a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql +++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql @@ -15,6 +15,5 @@ -CREATE INDEX users_creation_ts ON users(creation_ts); INSERT into background_updates (update_name, progress_json) VALUES ('users_creation_ts', '{}'); -- cgit 1.5.1 From 857e6fd8b681ba3009943c2466f56fe4c3239933 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 1 Jun 2018 12:18:11 +0100 Subject: Ignore depth when updating read-receipts Order read receipts by stream ordering instead of depth --- synapse/storage/receipts.py | 67 +++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 30 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 709c69a926..dd183cebce 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from synapse.api.errors import NotFoundError from ._base import SQLBaseStore from .util.id_generators import StreamIdGenerator from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached @@ -332,6 +332,41 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + if not res: + raise NotFoundError( + "Cannot set read receipt on unknown event %s" % ( + event_id, + ), + ) + + stream_ordering = int(res["stream_ordering"]) + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + sql = ( + "SELECT stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + txn.execute(sql, (room_id, receipt_type, user_id)) + + for so, eid in txn: + if int(so) >= stream_ordering: + logger.debug( + "Ignoring new receipt for %s in favour of existing " + "one for later event %s", + event_id, eid, + ) + return False + txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) @@ -355,34 +390,6 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type) ) - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - topological_ordering = int(res["topological_ordering"]) if res else None - stream_ordering = int(res["stream_ordering"]) if res else None - - # We don't want to clobber receipts for more recent events, so we - # have to compare orderings of existing receipts - sql = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized as r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - - txn.execute(sql, (room_id, receipt_type, user_id)) - - if topological_ordering: - for to, so, _ in txn: - if int(to) > topological_ordering: - return False - elif int(to) == topological_ordering and int(so) >= stream_ordering: - return False - self._simple_delete_txn( txn, table="receipts_linearized", @@ -406,7 +413,7 @@ class ReceiptsStore(ReceiptsWorkerStore): } ) - if receipt_type == "m.read" and topological_ordering: + if receipt_type == "m.read": self._remove_old_push_actions_before_txn( txn, room_id=room_id, -- cgit 1.5.1 From 9f797a24a452a513628263b1b03172cee20a9856 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 1 Jun 2018 14:01:43 +0100 Subject: Handle RRs which arrive before their events --- synapse/storage/receipts.py | 44 +++++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 25 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index dd183cebce..c93c228f6e 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import NotFoundError + from ._base import SQLBaseStore from .util.id_generators import StreamIdGenerator from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached @@ -340,32 +340,26 @@ class ReceiptsStore(ReceiptsWorkerStore): allow_none=True ) - if not res: - raise NotFoundError( - "Cannot set read receipt on unknown event %s" % ( - event_id, - ), - ) - - stream_ordering = int(res["stream_ordering"]) + stream_ordering = int(res["stream_ordering"]) if res else None # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts - sql = ( - "SELECT stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized as r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - txn.execute(sql, (room_id, receipt_type, user_id)) - - for so, eid in txn: - if int(so) >= stream_ordering: - logger.debug( - "Ignoring new receipt for %s in favour of existing " - "one for later event %s", - event_id, eid, - ) - return False + if stream_ordering is not None: + sql = ( + "SELECT stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + txn.execute(sql, (room_id, receipt_type, user_id)) + + for so, eid in txn: + if int(so) >= stream_ordering: + logger.debug( + "Ignoring new receipt for %s in favour of existing " + "one for later event %s", + event_id, eid, + ) + return False txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) @@ -413,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore): } ) - if receipt_type == "m.read": + if receipt_type == "m.read" and stream_ordering is not None: self._remove_old_push_actions_before_txn( txn, room_id=room_id, -- cgit 1.5.1