diff --git a/synapse/config/key.py b/synapse/config/key.py
index ac90cd3fc1..a072aec714 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -22,8 +22,14 @@ from signedjson.key import (
read_signing_keys, write_signing_keys, NACL_ED25519
)
from unpaddedbase64 import decode_base64
+from synapse.util.stringutils import random_string_with_symbols
import os
+import hashlib
+import logging
+
+
+logger = logging.getLogger(__name__)
class KeyConfig(Config):
@@ -40,9 +46,29 @@ class KeyConfig(Config):
config["perspectives"]
)
- def default_config(self, config_dir_path, server_name, **kwargs):
+ self.macaroon_secret_key = config.get(
+ "macaroon_secret_key", self.registration_shared_secret
+ )
+
+ if not self.macaroon_secret_key:
+ # Unfortunately, there are people out there that don't have this
+ # set. Lets just be "nice" and derive one from their secret key.
+ logger.warn("Config is missing missing macaroon_secret_key")
+ seed = self.signing_key[0].seed
+ self.macaroon_secret_key = hashlib.sha256(seed)
+
+ def default_config(self, config_dir_path, server_name, is_generating_file=False,
+ **kwargs):
base_key_name = os.path.join(config_dir_path, server_name)
+
+ if is_generating_file:
+ macaroon_secret_key = random_string_with_symbols(50)
+ else:
+ macaroon_secret_key = None
+
return """\
+ macaroon_secret_key: "%(macaroon_secret_key)s"
+
## Signing Keys ##
# Path to the signing key to sign messages with
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 9b6dacc5b8..ab062d528c 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -32,26 +32,14 @@ class RegistrationConfig(Config):
)
self.registration_shared_secret = config.get("registration_shared_secret")
- self.macaroon_secret_key = config.get("macaroon_secret_key")
- if self.macaroon_secret_key is None:
- raise Exception(
- "Config is missing missing macaroon_secret_key - please set it"
- " in your config file."
- )
+
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
self.allow_guest_access = config.get("allow_guest_access", False)
- def default_config(self, is_generating_file=False, **kwargs):
+ def default_config(self, **kwargs):
registration_shared_secret = random_string_with_symbols(50)
- macaroon_line = ""
- if is_generating_file:
- macaroon_line += '\n macaroon_secret_key: "%s"\n' % (
- random_string_with_symbols(50),
- )
-
- macaroon_secret_key = random_string_with_symbols(50)
return """\
## Registration ##
@@ -61,7 +49,7 @@ class RegistrationConfig(Config):
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.
registration_shared_secret: "%(registration_shared_secret)s"
-%(macaroon_line)s
+
# Set the number of bcrypt rounds used to generate password hash.
# Larger numbers increase the work factor needed to generate the hash.
# The default number of rounds is 12.
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f51200d18e..8a475417a6 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -20,3 +20,4 @@ class EventContext(object):
self.current_state = current_state
self.state_group = None
self.rejected = False
+ self.push_actions = []
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index fa83d3e464..064e8723c8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,25 +53,10 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory()
@defer.inlineCallbacks
- def _filter_events_for_clients(self, user_tuples, events):
+ def _filter_events_for_clients(self, user_tuples, events, event_id_to_state):
""" Returns dict of user_id -> list of events that user is allowed to
see.
"""
- # If there is only one user, just get the state for that one user,
- # otherwise just get all the state.
- if len(user_tuples) == 1:
- types = (
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_tuples[0][0]),
- )
- else:
- types = None
-
- event_id_to_state = yield self.store.get_state_for_events(
- frozenset(e.event_id for e in events),
- types=types
- )
-
forgotten = yield defer.gatherResults([
self.store.who_forgot_in_room(
room_id,
@@ -135,7 +120,17 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, events, is_peeking=False):
# Assumes that user has at some point joined the room if not is_guest.
- res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
+ types = (
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
+ event_id_to_state = yield self.store.get_state_for_events(
+ frozenset(e.event_id for e in events),
+ types=types
+ )
+ res = yield self._filter_events_for_clients(
+ [(user_id, is_peeking)], events, event_id_to_state
+ )
defer.returnValue(res.get(user_id, []))
def ratelimit(self, user_id):
@@ -269,13 +264,13 @@ class BaseHandler(object):
"You don't have permission to redact events"
)
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
- event, self
+ event, context, self
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
)
destinations = set()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b78b0502d9..da55d43541 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -236,12 +236,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- if not backfilled and not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, self
- )
-
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
@@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
+ if not backfilled and not event.internal_metadata.is_outlier():
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context, self
+ )
+
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3f1cda5b0b..ddeed27965 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -19,6 +19,7 @@ from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.metrics import Measure
from twisted.internet import defer
@@ -178,18 +179,6 @@ class SyncHandler(BaseHandler):
else:
return self.incremental_sync_with_gap(sync_config, since_token)
- def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
- if room_id not in ephemeral_by_room:
- return None
- for e in ephemeral_by_room[room_id]:
- if e['type'] != 'm.receipt':
- continue
- for receipt_event_id, val in e['content'].items():
- if 'm.read' in val:
- if user_id in val['m.read']:
- return receipt_event_id
- return None
-
@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
@@ -318,7 +307,6 @@ class SyncHandler(BaseHandler):
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
- all_ephemeral_by_room=ephemeral_by_room,
batch=batch,
full_state=True,
)
@@ -368,50 +356,51 @@ class SyncHandler(BaseHandler):
typing events for that room.
"""
- typing_key = since_token.typing_key if since_token else "0"
+ with Measure(self.clock, "ephemeral_by_room"):
+ typing_key = since_token.typing_key if since_token else "0"
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events(
- user=sync_config.user,
- from_key=typing_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("typing_key", typing_key)
-
- ephemeral_by_room = {}
-
- for event in typing:
- # we want to exclude the room_id from the event, but modifying the
- # result returned by the event source is poor form (it might cache
- # the object)
- room_id = event["room_id"]
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
-
- receipt_key = since_token.receipt_key if since_token else "0"
-
- receipt_source = self.event_sources.sources["receipt"]
- receipts, receipt_key = yield receipt_source.get_new_events(
- user=sync_config.user,
- from_key=receipt_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
+
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
- for event in receipts:
- room_id = event["room_id"]
- # exclude room id, as above
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room))
@@ -451,13 +440,6 @@ class SyncHandler(BaseHandler):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
- # We now fetch all ephemeral events for this room in order to get
- # this users current read receipt. This could almost certainly be
- # optimised.
- _, all_ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token
- )
-
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
@@ -589,7 +571,6 @@ class SyncHandler(BaseHandler):
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
- all_ephemeral_by_room=all_ephemeral_by_room,
batch=batch,
full_state=full_state,
)
@@ -619,58 +600,64 @@ class SyncHandler(BaseHandler):
"""
:returns a Deferred TimelineBatch
"""
- filtering_factor = 2
- timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 10)
- max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
-
- limited = recents is None or newly_joined_room or timeline_limit < len(recents)
-
- if recents is not None:
- recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- recents,
- is_peeking=sync_config.is_guest,
- )
- else:
- recents = []
-
- since_key = None
- if since_token and not newly_joined_room:
- since_key = since_token.room_key
-
- while limited and len(recents) < timeline_limit and max_repeat:
- events, end_key = yield self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
- loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
- loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- loaded_recents,
- is_peeking=sync_config.is_guest,
- )
- loaded_recents.extend(recents)
- recents = loaded_recents
-
- if len(events) <= load_limit:
+ with Measure(self.clock, "load_filtered_recents"):
+ filtering_factor = 2
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
+ if recents is None or newly_joined_room or timeline_limit < len(recents):
+ limited = True
+ else:
limited = False
- break
- max_repeat -= 1
- if len(recents) > timeline_limit:
- limited = True
- recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ if recents is not None:
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+ recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ recents,
+ is_peeking=sync_config.is_guest,
+ )
+ else:
+ recents = []
+
+ since_key = None
+ if since_token and not newly_joined_room:
+ since_key = since_token.room_key
+
+ while limited and len(recents) < timeline_limit and max_repeat:
+ events, end_key = yield self.store.get_room_events_stream_for_room(
+ room_id,
+ limit=load_limit + 1,
+ from_key=since_key,
+ to_key=end_key,
+ )
+ loaded_recents = sync_config.filter_collection.filter_room_timeline(
+ events
+ )
+ loaded_recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ loaded_recents,
+ is_peeking=sync_config.is_guest,
+ )
+ loaded_recents.extend(recents)
+ recents = loaded_recents
- prev_batch_token = now_token.copy_and_replace(
- "room_key", room_key
- )
+ if len(events) <= load_limit:
+ limited = False
+ break
+ max_repeat -= 1
+
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
+ room_key = recents[0].internal_metadata.before
+
+ prev_batch_token = now_token.copy_and_replace(
+ "room_key", room_key
+ )
defer.returnValue(TimelineBatch(
events=recents,
@@ -683,7 +670,6 @@ class SyncHandler(BaseHandler):
since_token, now_token,
ephemeral_by_room, tags_by_room,
account_data_by_room,
- all_ephemeral_by_room,
batch, full_state=False):
state = yield self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token,
@@ -714,7 +700,7 @@ class SyncHandler(BaseHandler):
if room_sync:
notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config, all_ephemeral_by_room
+ room_id, sync_config
)
if notifs is not None:
@@ -831,50 +817,53 @@ class SyncHandler(BaseHandler):
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- if full_state:
- if batch:
- state = yield self.store.get_state_for_event(batch.events[0].event_id)
- else:
- state = yield self.get_state_at(
- room_id, stream_position=now_token
- )
+ with Measure(self.clock, "compute_state_delta"):
+ if full_state:
+ if batch:
+ state = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+ else:
+ state = yield self.get_state_at(
+ room_id, stream_position=now_token
+ )
- timeline_state = {
- (event.type, event.state_key): event
- for event in batch.events if event.is_state()
- }
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
- state = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state,
- previous={},
- )
- elif batch.limited:
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
- )
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state,
+ previous={},
+ )
+ elif batch.limited:
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
- state_at_timeline_start = yield self.store.get_state_for_event(
- batch.events[0].event_id
- )
+ state_at_timeline_start = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
- timeline_state = {
- (event.type, event.state_key): event
- for event in batch.events if event.is_state()
- }
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
- state = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- previous=state_at_previous_sync,
- )
- else:
- state = {}
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ previous=state_at_previous_sync,
+ )
+ else:
+ state = {}
- defer.returnValue({
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(state.values())
- })
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
def check_joined_room(self, sync_config, state_delta):
"""
@@ -895,21 +884,24 @@ class SyncHandler(BaseHandler):
return False
@defer.inlineCallbacks
- def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
- last_unread_event_id = self.last_read_event_id_for_room_and_user(
- room_id, sync_config.user.to_string(), ephemeral_by_room
- )
-
- notifs = []
- if last_unread_event_id:
- notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
- room_id, sync_config.user.to_string(), last_unread_event_id
+ def unread_notifs_for_room_id(self, room_id, sync_config):
+ with Measure(self.clock, "unread_notifs_for_room_id"):
+ last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+ user_id=sync_config.user.to_string(),
+ room_id=room_id,
+ receipt_type="m.read"
)
- defer.returnValue(notifs)
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- defer.returnValue(None)
+ notifs = []
+ if last_unread_event_id:
+ notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ defer.returnValue(notifs)
+
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
def _action_has_highlight(actions):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 43bf600913..b16d0017df 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
from synapse.types import UserID
import logging
@@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler):
class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
+ self.clock = hs.get_clock()
self._handler = None
self._room_member_handler = None
@@ -247,19 +249,20 @@ class TypingNotificationEventSource(object):
}
def get_new_events(self, from_key, room_ids, **kwargs):
- from_key = int(from_key)
- handler = self.handler()
+ with Measure(self.clock, "typing.get_new_events"):
+ from_key = int(from_key)
+ handler = self.handler()
- events = []
- for room_id in room_ids:
- if room_id not in handler._room_serials:
- continue
- if handler._room_serials[room_id] <= from_key:
- continue
+ events = []
+ for room_id in room_ids:
+ if room_id not in handler._room_serials:
+ continue
+ if handler._room_serials[room_id] <= from_key:
+ continue
- events.append(self._make_event_for(room_id))
+ events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
+ return events, handler._latest_room_serial
def get_current_key(self):
return self.handler()._latest_room_serial
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index b9522a8050..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -128,7 +128,8 @@ class Pusher(object):
try:
if wait > 0:
yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
wait = 0
except:
if wait == 0:
@@ -150,27 +151,115 @@ class Pusher(object):
only_keys=("room", "receipt",),
)
- with Measure(self.clock, "push"):
- # limiting to 1 may get 1 event plus 1 presence event, so
- # pick out the actual event
- single_event = None
- read_receipt = None
- for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- single_event = c
- elif c['type'] == 'm.receipt':
- read_receipt = c
-
- have_updated_badge = False
- if read_receipt:
- for receipt_part in read_receipt['content'].values():
- if 'm.read' in receipt_part:
- if self.user_id in receipt_part['m.read'].keys():
- have_updated_badge = True
-
- if not single_event:
- if have_updated_badge:
- yield self.update_badge()
+ # limiting to 1 may get 1 event plus 1 presence event, so
+ # pick out the actual event
+ single_event = None
+ read_receipt = None
+ for c in chunk['chunk']:
+ if 'event_id' in c: # Hmmm...
+ single_event = c
+ elif c['type'] == 'm.receipt':
+ read_receipt = c
+
+ have_updated_badge = False
+ if read_receipt:
+ for receipt_part in read_receipt['content'].values():
+ if 'm.read' in receipt_part:
+ if self.user_id in receipt_part['m.read'].keys():
+ have_updated_badge = True
+
+ if not single_event:
+ if have_updated_badge:
+ yield self.update_badge()
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_token
+ )
+ return
+
+ if not self.alive:
+ return
+
+ processed = False
+
+ rule_evaluator = yield \
+ push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
+ self.user_id, self.profile_tag, single_event['room_id'], self.store
+ )
+
+ actions = yield rule_evaluator.actions_for_event(single_event)
+ tweaks = rule_evaluator.tweaks_for_actions(actions)
+
+ if 'notify' in actions:
+ self.badge = yield self._get_badge_count()
+ rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
+ self.has_unread = True
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
+ processed = True
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_id
+ )
+ else:
+ if have_updated_badge:
+ yield self.update_badge()
+ processed = True
+
+ if not self.alive:
+ return
+
+ if processed:
+ self.backoff_delay = Pusher.INITIAL_BACKOFF
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_token,
+ self.clock.time_msec()
+ )
+ if self.failing_since:
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since)
+ else:
+ if not self.failing_since:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+
+ if (self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_id, self.pushkey)
+ self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
@@ -178,114 +267,25 @@ class Pusher(object):
self.user_id,
self.last_token
)
- return
-
- if not self.alive:
- return
-
- processed = False
-
- rule_evaluator = yield \
- push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
- self.user_id, self.profile_tag, single_event['room_id'], self.store
- )
- actions = yield rule_evaluator.actions_for_event(single_event)
- tweaks = rule_evaluator.tweaks_for_actions(actions)
-
- if 'notify' in actions:
- self.badge = yield self._get_badge_count()
- rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
- self.has_unread = True
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- processed = True
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_id
- )
- else:
- if have_updated_badge:
- yield self.update_badge()
- processed = True
-
- if not self.alive:
- return
-
- if processed:
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token_and_success(
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
- self.last_token,
- self.clock.time_msec()
+ self.failing_since
)
- if self.failing_since:
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since)
else:
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
-
- if (self.failing_since and
- self.failing_since <
- self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_id, self.pushkey)
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
-
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
- else:
- logger.warn("Failed to dispatch push for user %s "
- "(failing for %dms)."
- "Trying again in %dms",
- self.user_id,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay)
- yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *= 2
- if self.backoff_delay > Pusher.MAX_BACKOFF:
- self.backoff_delay = Pusher.MAX_BACKOFF
+ logger.warn("Failed to dispatch push for user %s "
+ "(failing for %dms)."
+ "Trying again in %dms",
+ self.user_id,
+ self.clock.time_msec() - self.failing_since,
+ self.backoff_delay)
+ yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+ self.backoff_delay *= 2
+ if self.backoff_delay > Pusher.MAX_BACKOFF:
+ self.backoff_delay = Pusher.MAX_BACKOFF
def stop(self):
self.alive = False
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1d2e558f9a..e0da0868ec 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
import logging
-from synapse.api.constants import EventTypes
-
logger = logging.getLogger(__name__)
@@ -36,21 +34,15 @@ class ActionGenerator:
# tag (ie. we just need all the users).
@defer.inlineCallbacks
- def handle_push_actions_for_event(self, event, handler):
- if event.type == EventTypes.Redaction and event.redacts is not None:
- yield self.store.remove_push_actions_for_event_id(
- event.room_id, event.redacts
- )
-
+ def handle_push_actions_for_event(self, event, context, handler):
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.hs, self.store
)
- actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)
-
- yield self.store.set_push_actions_for_event_and_users(
- event,
- [
- (uid, None, actions) for uid, actions in actions_by_user.items()
- ]
+ actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+ event, handler, context.current_state
)
+
+ context.push_actions = [
+ (uid, None, actions) for uid, actions in actions_by_user.items()
+ ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 20c60422bf..8ac5ceb9ef 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
self.store = store
@defer.inlineCallbacks
- def action_for_event_by_user(self, event, handler):
+ def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
users_dict = yield self.store.are_guests(self.rules_by_user.keys())
filtered_by_user = yield handler._filter_events_for_clients(
- users_dict.items(), [event]
+ users_dict.items(), [event], {event.event_id: current_state}
)
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
condition_cache = {}
- member_state = yield self.store.get_state_for_event(
- event.event_id,
- )
-
display_names = {}
- for ev in member_state.values():
+ for ev in current_state.values():
nm = ev.content.get("displayname", None)
if nm and ev.type == EventTypes.Member:
display_names[ev.state_key] = nm
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 5ec52707e7..e2f5eb7b29 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class WhoisRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)$")
+ PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 9410ac527e..a6f8754e32 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
class PresenceStatusRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status$")
+ PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
@@ -73,7 +73,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
class PresenceListRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)$")
+ PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index aeda7bfa39..3c5a212920 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -23,7 +23,7 @@ import simplejson as json
class ProfileDisplaynameRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/displayname$")
+ PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/displayname")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
@@ -60,7 +60,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
class ProfileAvatarURLRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/avatar_url$")
+ PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)/avatar_url")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
@@ -97,7 +97,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
class ProfileRestServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)$")
+ PATTERNS = client_path_patterns("/profile/(?P<user_id>[^/]*)")
@defer.inlineCallbacks
def on_GET(self, request, user_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0a969f50b..d77a817682 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
- @defer.inlineCallbacks
- def set_push_actions_for_event_and_users(self, event, tuples):
+ def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
:param event: the event set actions for
:param tuples: list of tuples of (user_id, profile_tag, actions)
@@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore):
'highlight': 1 if _action_has_highlight(actions) else 0,
})
- def f(txn):
- for uid, _, __ in tuples:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid)
- )
- return self._simple_insert_many_txn(txn, "event_push_actions", values)
-
- yield self.runInteraction(
- "set_actions_for_event_and_users",
- f,
- )
+ for uid, _, __ in tuples:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid)
+ )
+ self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user(
@@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
- @defer.inlineCallbacks
- def remove_push_actions_for_event_id(self, room_id, event_id):
- def f(txn):
- # Sad that we have to blow away the cache for the whole room here
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (room_id,)
- )
- txn.execute(
- "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
- (room_id, event_id)
- )
- yield self.runInteraction(
- "remove_push_actions_for_event_id",
- f
+ def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
+ # Sad that we have to blow away the cache for the whole room here
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id,)
+ )
+ txn.execute(
+ "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
+ (room_id, event_id)
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c6ed54721c..3a5c6ee4b1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):
-
- # Remove the any existing cache entries for the event_ids
- for event, _ in events_and_contexts:
+ depth_updates = {}
+ for event, context in events_and_contexts:
+ # Remove the any existing cache entries for the event_ids
txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
if not backfilled:
txn.call_after(
self._events_stream_cache.entity_has_changed,
event.room_id, event.internal_metadata.stream_ordering,
)
- depth_updates = {}
- for event, _ in events_and_contexts:
- if event.internal_metadata.is_outlier():
- continue
- depth_updates[event.room_id] = max(
- event.depth, depth_updates.get(event.room_id, event.depth)
+ if not event.internal_metadata.is_outlier():
+ depth_updates[event.room_id] = max(
+ event.depth, depth_updates.get(event.room_id, event.depth)
+ )
+
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ self._remove_push_actions_for_event_id_txn(
+ txn, event.room_id, event.redacts
)
for room_id, depth in depth_updates.items():
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d782b8e25b..850736c85e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -211,7 +211,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
logger.debug("applied_delta_files: %s", applied_delta_files)
for v in range(start_ver, SCHEMA_VERSION + 1):
- logger.debug("Upgrading schema to v%d", v)
+ logger.info("Upgrading schema to v%d", v)
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 8068c73740..4202a6b3dc 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -46,6 +46,20 @@ class ReceiptsStore(SQLBaseStore):
desc="get_receipts_for_room",
)
+ @cached(num_args=3)
+ def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type):
+ return self._simple_select_one_onecol(
+ table="receipts_linearized",
+ keyvalues={
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id
+ },
+ retcol="event_id",
+ desc="get_own_receipt_for_user",
+ allow_none=True,
+ )
+
@cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type):
def f(txn):
@@ -226,6 +240,11 @@ class ReceiptsStore(SQLBaseStore):
room_id, stream_id
)
+ txn.call_after(
+ self.get_last_receipt_event_id_for_user.invalidate,
+ (user_id, room_id, receipt_type)
+ )
+
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 7c6899aa1c..c51b641125 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -48,33 +48,35 @@ block_db_txn_duration = metrics.register_distribution(
class Measure(object):
- __slots__ = ["clock", "name", "start_context", "start", "new_context"]
+ __slots__ = [
+ "clock", "name", "start_context", "start", "new_context", "ru_utime",
+ "ru_stime", "db_txn_count", "db_txn_duration"
+ ]
def __init__(self, clock, name):
self.clock = clock
self.name = name
self.start_context = None
self.start = None
- self.new_context = LoggingContext(self.name)
def __enter__(self):
self.start = self.clock.time_msec()
self.start_context = LoggingContext.current_context()
- self.new_context.__enter__()
+ if self.start_context:
+ self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
+ self.db_txn_count = self.start_context.db_txn_count
+ self.db_txn_duration = self.start_context.db_txn_duration
def __exit__(self, exc_type, exc_val, exc_tb):
- current_context = LoggingContext.current_context()
-
- self.new_context.__exit__(exc_type, exc_val, exc_tb)
- if exc_type is not None:
+ if exc_type is not None or not self.start_context:
return
duration = self.clock.time_msec() - self.start
block_timer.inc_by(duration, self.name)
- context = self.new_context
+ context = LoggingContext.current_context()
- if context != current_context:
+ if context != self.start_context:
logger.warn(
"Context have unexpectedly changed from '%s' to '%s'. (%r)",
context, self.start_context, self.name
@@ -87,7 +89,9 @@ class Measure(object):
ru_utime, ru_stime = context.get_resource_usage()
- block_ru_utime.inc_by(ru_utime, self.name)
- block_ru_stime.inc_by(ru_stime, self.name)
- block_db_txn_count.inc_by(context.db_txn_count, self.name)
- block_db_txn_duration.inc_by(context.db_txn_duration, self.name)
+ block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name)
+ block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name)
+ block_db_txn_count.inc_by(context.db_txn_count - self.db_txn_count, self.name)
+ block_db_txn_duration.inc_by(
+ context.db_txn_duration - self.db_txn_duration, self.name
+ )
|