From 7e2c89a37f3a5261f43b4d472b36219ac41dfb16 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 15:42:15 +0100 Subject: Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do: * Make badges work again * Remove old, unused code --- synapse/push/pusherpool.py | 75 +++++++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 34 deletions(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0b463c6fdb..b67ad455ea 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,9 +16,10 @@ from twisted.internet import defer -from .httppusher import HttpPusher +import pusher from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn +from synapse.util.async import run_on_reactor import logging @@ -48,7 +49,7 @@ class PusherPool: # will then get pulled out of the database, # recreated, added and started: this means we have only one # code path adding pushers. - self._create_pusher({ + pusher.create_pusher(self.hs, { "user_name": user_id, "kind": kind, "app_id": app_id, @@ -58,10 +59,18 @@ class PusherPool: "ts": time_now_msec, "lang": lang, "data": data, - "last_token": None, + "last_stream_ordering": None, "last_success": None, "failing_since": None }) + + # create the pusher setting last_stream_ordering to the current maximum + # stream ordering in event_push_actions, so it will process + # pushes from this point onwards. + last_stream_ordering = ( + yield self.store.get_latest_push_action_stream_ordering() + ) + yield self.store.add_pusher( user_id=user_id, access_token=access_token, @@ -73,6 +82,7 @@ class PusherPool: pushkey_ts=time_now_msec, lang=lang, data=data, + last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) yield self._refresh_pusher(app_id, pushkey, user_id) @@ -106,26 +116,19 @@ class PusherPool: ) yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) - def _create_pusher(self, pusherdict): - if pusherdict['kind'] == 'http': - return HttpPusher( - self.hs, - user_id=pusherdict['user_name'], - app_id=pusherdict['app_id'], - app_display_name=pusherdict['app_display_name'], - device_display_name=pusherdict['device_display_name'], - pushkey=pusherdict['pushkey'], - pushkey_ts=pusherdict['ts'], - data=pusherdict['data'], - last_token=pusherdict['last_token'], - last_success=pusherdict['last_success'], - failing_since=pusherdict['failing_since'] - ) - else: - raise PusherConfigException( - "Unknown pusher type '%s' for user %s" % - (pusherdict['kind'], pusherdict['user_name']) + @defer.inlineCallbacks + def on_new_notifications(self, min_stream_id, max_stream_id): + yield run_on_reactor() + try: + users_affected = yield self.store.get_push_action_users_in_range( + min_stream_id, max_stream_id ) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + p.on_new_notifications(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_notifications") @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): @@ -146,30 +149,34 @@ class PusherPool: logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: - p = self._create_pusher(pusherdict) + p = pusher.create_pusher(self.hs, pusherdict) except PusherConfigException: logger.exception("Couldn't start a pusher: caught PusherConfigException") continue if p: - fullid = "%s:%s:%s" % ( + appid_pushkey = "%s:%s" % ( pusherdict['app_id'], pusherdict['pushkey'], - pusherdict['user_name'] ) - if fullid in self.pushers: - self.pushers[fullid].stop() - self.pushers[fullid] = p - preserve_fn(p.start)() + byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p + preserve_fn(p.on_started)() logger.info("Started pushers") @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): - fullid = "%s:%s:%s" % (app_id, pushkey, user_id) - if fullid in self.pushers: - logger.info("Stopping pusher %s", fullid) - self.pushers[fullid].stop() - del self.pushers[fullid] + appid_pushkey = "%s:%s" % (app_id, pushkey) + + byuser = self.pushers.get(user_id, {}) + + if appid_pushkey in byuser: + logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) + byuser[appid_pushkey].on_stop() + del byuser[appid_pushkey] yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) -- cgit 1.4.1 From 92e3071623c34350bf072bb77e089d5d6d5f41c2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 15:39:53 +0100 Subject: Send badge count pushes. Also fix bugs with retrying. --- synapse/handlers/receipts.py | 21 +++++++++++++++++---- synapse/push/httppusher.py | 45 ++++++++++++++++++++++++++++---------------- synapse/push/pusherpool.py | 20 +++++++++++++++++++- synapse/storage/receipts.py | 9 ++++++--- 4 files changed, 71 insertions(+), 24 deletions(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 935c339707..26b0368080 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler): def _handle_new_receipts(self, receipts): """Takes a list of receipts, stores them and informs the notifier. """ + min_batch_id = None + max_batch_id = None + for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -97,10 +100,20 @@ class ReceiptsHandler(BaseHandler): stream_id, max_persisted_id = res - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_persisted_id, rooms=[room_id] - ) + if min_batch_id is None or stream_id < min_batch_id: + min_batch_id = stream_id + if max_batch_id is None or max_persisted_id > max_batch_id: + max_batch_id = max_persisted_id + + affected_room_ids = list(set([r["room_id"] for r in receipts])) + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids + ) defer.returnValue(True) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index d695885649..0d5450bc01 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -76,15 +76,25 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] + @defer.inlineCallbacks def on_started(self): - self._process() + yield self._process() + @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max_stream_ordering - self._process() + yield self._process() + + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could check the receipts are actually m.read receipts here, + # but currently that's the only type of receipt anyway... + badge = yield push_tools.get_badge_count(self.hs, self.user_id) + yield self.send_badge(badge) + @defer.inlineCallbacks def on_timer(self): - self._process() + yield self._process() def on_stop(self): if self.timed_call: @@ -106,22 +116,24 @@ class HttpPusher(object): self.last_stream_ordering, self.clock.time_msec() ) - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) else: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) if ( self.failing_since and self.failing_since < - self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER + self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS ): # we really only give up so that if the URL gets # fixed, we don't suddenly deliver a load @@ -148,7 +160,7 @@ class HttpPusher(object): else: logger.info("Push failed: delaying for %ds", self.backoff_delay) self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) - self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC) + self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break @defer.inlineCallbacks @@ -191,7 +203,8 @@ class HttpPusher(object): d = { 'notification': { - 'id': event.event_id, + 'id': event.event_id, # deprecated: remove soon + 'event_id': event.event_id, 'room_id': event.room_id, 'type': event.type, 'sender': event.user_id, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b67ad455ea..7b1ce81e9a 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -126,10 +126,28 @@ class PusherPool: for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + yield p.on_new_notifications(min_stream_id, max_stream_id) except: logger.exception("Exception in pusher on_new_notifications") + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + yield run_on_reactor() + try: + # Need to subtract 1 from the minimum because the lower bound here + # is not inclusive + updated_receipts = yield self.store.get_all_updated_receipts( + min_stream_id - 1, max_stream_id + ) + # This returns a tuple, user_id is at index 3 + users_affected = set([r[3] for r in updated_receipts]) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + yield p.on_new_receipts(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_receipts") + @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 4befebc8e2..59d1ac0314 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore): } ) - def get_all_updated_receipts(self, last_id, current_id, limit): + def get_all_updated_receipts(self, last_id, current_id, limit=None): def get_all_updated_receipts_txn(txn): sql = ( "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" " FROM receipts_linearized" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_id, current_id, limit)) + args = [last_id, current_id] + if limit is not None: + sql += " LIMIT ?" + args.append(limit) + txn.execute(sql, args) return txn.fetchall() return self.runInteraction( -- cgit 1.4.1 From 25cd5bb697996c1764c7746e4dfc1d8fffaaf8b2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 17:22:14 +0100 Subject: defer.gatherResults rather than doing all the pokes in series --- synapse/push/pusherpool.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 7b1ce81e9a..8da444179c 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -123,10 +123,17 @@ class PusherPool: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id ) + + deferreds = [] + for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - yield p.on_new_notifications(min_stream_id, max_stream_id) + deferreds.append( + p.on_new_notifications(min_stream_id, max_stream_id) + ) + + yield defer.gatherResults(deferreds) except: logger.exception("Exception in pusher on_new_notifications") @@ -141,10 +148,17 @@ class PusherPool: ) # This returns a tuple, user_id is at index 3 users_affected = set([r[3] for r in updated_receipts]) + + deferreds = [] + for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - yield p.on_new_receipts(min_stream_id, max_stream_id) + deferreds.append( + p.on_new_receipts(min_stream_id, max_stream_id) + ) + + yield defer.gatherResults(deferreds) except: logger.exception("Exception in pusher on_new_receipts") -- cgit 1.4.1 From 6ec02e9ecf7ffe3d3737a69a480939a07d62428b Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 17:24:05 +0100 Subject: indenting --- synapse/push/pusherpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 8da444179c..ba513601e7 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -155,7 +155,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_receipts(min_stream_id, max_stream_id) + p.on_new_receipts(min_stream_id, max_stream_id) ) yield defer.gatherResults(deferreds) -- cgit 1.4.1 From 65141161f6ead75c4f07c548447704a302686ebf Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 12 Apr 2016 16:25:26 +0100 Subject: Unused member variable --- synapse/push/pusherpool.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ba513601e7..aa095f9d9b 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -32,7 +32,6 @@ class PusherPool: self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} - self.last_pusher_started = -1 @defer.inlineCallbacks def start(self): -- cgit 1.4.1 From a3ac837599f62b77f458505f841cee6072c1f921 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 17:21:02 +0100 Subject: Optionally split out the pushers into a separate process --- synapse/app/pusher.py | 208 +++++++++++++++++++++++++++++++++++++++++++++ synapse/config/server.py | 1 + synapse/push/httppusher.py | 2 +- synapse/push/pusherpool.py | 4 + synapse/server.py | 3 + 5 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 synapse/app/pusher.py (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py new file mode 100644 index 0000000000..8922573db7 --- /dev/null +++ b/synapse/app/pusher.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.util.versionstring import get_version_string +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.storage.engines import create_engine +from synapse.storage import DataStore +from synapse.util.async import sleep +from synapse.util.logcontext import (LoggingContext, preserve_fn) + +from twisted.internet import reactor, defer + +import sys +import logging + +logger = logging.getLogger("synapse.app.pusher") + + +class SlaveConfig(DatabaseConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.start_pushers = True + + def default_config(self, **kwargs): + return """\ + ## Slave ## + #replication_url: https://localhost:{replication_port}/_synapse/replication + + report_stats: False + """ + + +class PusherSlaveConfig(SlaveConfig, LoggingConfig): + pass + + +class PusherSlaveStore( + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore +): + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) + + +class PusherServer(HomeServer): + + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = PusherSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def remove_pusher(self, app_id, push_key, user_id): + http_client = self.get_simple_http_client() + replication_url = self.config.replication_url + url = replication_url + "/remove_pushers" + return http_client.post_json_get_json(url, { + "remove": [{ + "app_id": app_id, + "push_key": push_key, + "user_id": user_id, + }] + }) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + pusher_pool = self.get_pusherpool() + + def stop_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return pusher_pool._refresh_pusher(app_id, pushkey, user_id) + + @defer.inlineCallbacks + def poke_pushers(results): + pushers_rows = set( + map(tuple, results.get("pushers", {}).get("rows", [])) + ) + deleted_pushers_rows = set( + map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + ) + for row in sorted(pushers_rows | deleted_pushers_rows): + if row in deleted_pushers_rows: + user_id, app_id, pushkey = row[1:4] + stop_pusher(user_id, app_id, pushkey) + elif row in pushers_rows: + user_id = row[1] + app_id = row[5] + pushkey = row[8] + yield start_pusher(user_id, app_id, pushkey) + + stream = results.get("events") + if stream: + min_stream_id = stream["rows"][0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_notifications)( + min_stream_id, max_stream_id + ) + + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + poke_pushers(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(30) + + +def setup(config_options): + try: + config = PusherSlaveConfig.load_config( + "Synapse pusher", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ps = PusherServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ps.setup() + + def start(): + ps.replicate() + ps.get_pusherpool().start() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + return ps + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + reactor.run() diff --git a/synapse/config/server.py b/synapse/config/server.py index df4707e1d1..46c633548a 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,6 +28,7 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.start_pushers = config.get("start_pushers", True) self.listeners = config.get("listeners", []) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 6950a20632..3992804845 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -230,7 +230,7 @@ class HttpPusher(object): "Pushkey %s was rejected: removing", pk ) - yield self.hs.get_pusherpool().remove_pusher( + yield self.hs.remove_pusher( self.app_id, pk, self.user_id ) defer.returnValue(True) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index aa095f9d9b..6ef48d63f7 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) class PusherPool: def __init__(self, _hs): self.hs = _hs + self.start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} @@ -177,6 +178,9 @@ class PusherPool: self._start_pushers([p]) def _start_pushers(self, pushers): + if not self.start_pushers: + logger.info("Not starting pushers because they are disabled in the config") + return logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: diff --git a/synapse/server.py b/synapse/server.py index 368d615576..ee138de756 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -193,6 +193,9 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def remove_pusher(self, app_id, push_key, user_id): + return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def _make_dependency_method(depname): def _get(hs): -- cgit 1.4.1 From b2c04da8dc98ca09620dc207c95f68b2e8a52e62 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 11:43:57 +0100 Subject: Add an email pusher for new users If they registered with an email address and email notifs are enabled on the HS --- synapse/push/pusherpool.py | 1 + synapse/rest/client/v2_alpha/register.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 6ef48d63f7..7fef2fb6f7 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -50,6 +50,7 @@ class PusherPool: # recreated, added and started: this means we have only one # code path adding pushers. pusher.create_pusher(self.hs, { + "id": None, "user_name": user_id, "kind": kind, "app_id": app_id, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ff8f69ddbf..883b1c1291 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -48,6 +48,7 @@ class RegisterRestServlet(RestServlet): super(RegisterRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() + self.store = hs.get_datastore() self.auth_handler = hs.get_handlers().auth_handler self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler @@ -214,6 +215,31 @@ class RegisterRestServlet(RestServlet): threepid['validated_at'], ) + # And we add an email pusher for them by default, but only + # if email notifications are enabled (so people don't start + # getting mail spam where they weren't before if email + # notifs are set up on a home server) + if self.hs.config.email_enable_notifs: + # Pull the ID of the access token back out of the db + # It would really make more sense for this to be passed + # up when the access token is saved, but that's quite an + # invasive change I'd rather do separately. + user_tuple = yield self.store.get_user_by_access_token( + token + ) + + yield self.hs.get_pusherpool().add_pusher( + user_id=user_id, + access_token=user_tuple["token_id"], + kind="email", + app_id="m.email", + app_display_name="Email Notifications", + device_display_name=threepid["address"], + pushkey=threepid["address"], + lang=None, # We don't know a user's language here + data={}, + ) + if 'bind_email' in params and params['bind_email']: logger.info("bind_email specified: binding") -- cgit 1.4.1 From 92f0f3d21d52ae14a3d4d4536a84055b92d228ae Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 May 2016 11:24:07 +0100 Subject: Catch all exceptions when creating a pusher --- synapse/push/pusherpool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 7fef2fb6f7..66eafb69d8 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -186,8 +186,8 @@ class PusherPool: for pusherdict in pushers: try: p = pusher.create_pusher(self.hs, pusherdict) - except PusherConfigException: - logger.exception("Couldn't start a pusher: caught PusherConfigException") + except: + logger.exception("Couldn't start a pusher: caught Exception") continue if p: appid_pushkey = "%s:%s" % ( -- cgit 1.4.1 From e6bffa4475e1a37643317d709b2003cdf99c149f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 May 2016 11:26:58 +0100 Subject: Unused import --- synapse/push/pusherpool.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 66eafb69d8..5853ec36a9 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,6 @@ from twisted.internet import defer import pusher -from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn from synapse.util.async import run_on_reactor -- cgit 1.4.1