diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3f1cda5b0b..3e5f27046e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -19,6 +19,7 @@ from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.metrics import Measure
from twisted.internet import defer
@@ -368,50 +369,51 @@ class SyncHandler(BaseHandler):
typing events for that room.
"""
- typing_key = since_token.typing_key if since_token else "0"
+ with Measure(self.clock, "ephemeral_by_room"):
+ typing_key = since_token.typing_key if since_token else "0"
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
-
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events(
- user=sync_config.user,
- from_key=typing_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("typing_key", typing_key)
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
- ephemeral_by_room = {}
-
- for event in typing:
- # we want to exclude the room_id from the event, but modifying the
- # result returned by the event source is poor form (it might cache
- # the object)
- room_id = event["room_id"]
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
-
- receipt_key = since_token.receipt_key if since_token else "0"
-
- receipt_source = self.event_sources.sources["receipt"]
- receipts, receipt_key = yield receipt_source.get_new_events(
- user=sync_config.user,
- from_key=receipt_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
+
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
- for event in receipts:
- room_id = event["room_id"]
- # exclude room id, as above
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room))
@@ -619,58 +621,61 @@ class SyncHandler(BaseHandler):
"""
:returns a Deferred TimelineBatch
"""
- filtering_factor = 2
- timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 10)
- max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
-
- limited = recents is None or newly_joined_room or timeline_limit < len(recents)
-
- if recents is not None:
- recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- recents,
- is_peeking=sync_config.is_guest,
- )
- else:
- recents = []
-
- since_key = None
- if since_token and not newly_joined_room:
- since_key = since_token.room_key
-
- while limited and len(recents) < timeline_limit and max_repeat:
- events, end_key = yield self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
- loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
- loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- loaded_recents,
- is_peeking=sync_config.is_guest,
- )
- loaded_recents.extend(recents)
- recents = loaded_recents
+ with Measure(self.clock, "load_filtered_recents"):
+ filtering_factor = 2
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
+ limited = recents is None or newly_joined_room or timeline_limit < len(recents)
+
+ if recents is not None:
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+ recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ recents,
+ is_peeking=sync_config.is_guest,
+ )
+ else:
+ recents = []
+
+ since_key = None
+ if since_token and not newly_joined_room:
+ since_key = since_token.room_key
+
+ while limited and len(recents) < timeline_limit and max_repeat:
+ events, end_key = yield self.store.get_room_events_stream_for_room(
+ room_id,
+ limit=load_limit + 1,
+ from_key=since_key,
+ to_key=end_key,
+ )
+ loaded_recents = sync_config.filter_collection.filter_room_timeline(
+ events
+ )
+ loaded_recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ loaded_recents,
+ is_peeking=sync_config.is_guest,
+ )
+ loaded_recents.extend(recents)
+ recents = loaded_recents
- if len(events) <= load_limit:
- limited = False
- break
- max_repeat -= 1
+ if len(events) <= load_limit:
+ limited = False
+ break
+ max_repeat -= 1
- if len(recents) > timeline_limit:
- limited = True
- recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
+ room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(
- "room_key", room_key
- )
+ prev_batch_token = now_token.copy_and_replace(
+ "room_key", room_key
+ )
defer.returnValue(TimelineBatch(
events=recents,
@@ -831,50 +836,53 @@ class SyncHandler(BaseHandler):
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- if full_state:
- if batch:
- state = yield self.store.get_state_for_event(batch.events[0].event_id)
- else:
- state = yield self.get_state_at(
- room_id, stream_position=now_token
- )
+ with Measure(self.clock, "compute_state_delta"):
+ if full_state:
+ if batch:
+ state = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+ else:
+ state = yield self.get_state_at(
+ room_id, stream_position=now_token
+ )
- timeline_state = {
- (event.type, event.state_key): event
- for event in batch.events if event.is_state()
- }
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
- state = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state,
- previous={},
- )
- elif batch.limited:
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
- )
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state,
+ previous={},
+ )
+ elif batch.limited:
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
- state_at_timeline_start = yield self.store.get_state_for_event(
- batch.events[0].event_id
- )
+ state_at_timeline_start = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
- timeline_state = {
- (event.type, event.state_key): event
- for event in batch.events if event.is_state()
- }
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
- state = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- previous=state_at_previous_sync,
- )
- else:
- state = {}
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ previous=state_at_previous_sync,
+ )
+ else:
+ state = {}
- defer.returnValue({
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(state.values())
- })
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
def check_joined_room(self, sync_config, state_delta):
"""
@@ -896,20 +904,21 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
- last_unread_event_id = self.last_read_event_id_for_room_and_user(
- room_id, sync_config.user.to_string(), ephemeral_by_room
- )
-
- notifs = []
- if last_unread_event_id:
- notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
- room_id, sync_config.user.to_string(), last_unread_event_id
+ with Measure(self.clock, "unread_notifs_for_room_id"):
+ last_unread_event_id = self.last_read_event_id_for_room_and_user(
+ room_id, sync_config.user.to_string(), ephemeral_by_room
)
- defer.returnValue(notifs)
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- defer.returnValue(None)
+ notifs = []
+ if last_unread_event_id:
+ notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ defer.returnValue(notifs)
+
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
def _action_has_highlight(actions):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 43bf600913..b16d0017df 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
from synapse.types import UserID
import logging
@@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler):
class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
+ self.clock = hs.get_clock()
self._handler = None
self._room_member_handler = None
@@ -247,19 +249,20 @@ class TypingNotificationEventSource(object):
}
def get_new_events(self, from_key, room_ids, **kwargs):
- from_key = int(from_key)
- handler = self.handler()
+ with Measure(self.clock, "typing.get_new_events"):
+ from_key = int(from_key)
+ handler = self.handler()
- events = []
- for room_id in room_ids:
- if room_id not in handler._room_serials:
- continue
- if handler._room_serials[room_id] <= from_key:
- continue
+ events = []
+ for room_id in room_ids:
+ if room_id not in handler._room_serials:
+ continue
+ if handler._room_serials[room_id] <= from_key:
+ continue
- events.append(self._make_event_for(room_id))
+ events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
+ return events, handler._latest_room_serial
def get_current_key(self):
return self.handler()._latest_room_serial
|