From 07d765209dea12229e70a09784e647611acabcda Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 19 Apr 2016 14:24:36 +0100 Subject: First bits of emailpusher Mostly logic of when to send an email --- synapse/storage/event_push_actions.py | 57 +++++++++++++++++++--- synapse/storage/events.py | 2 + synapse/storage/pusher.py | 27 ++++++++++ synapse/storage/schema/delta/31/events.sql | 16 ++++++ .../storage/schema/delta/31/pusher_throttle.sql | 23 +++++++++ 5 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 synapse/storage/schema/delta/31/events.sql create mode 100644 synapse/storage/schema/delta/31/pusher_throttle.sql (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 86a98b6f11..ad512b2f07 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore): max_stream_ordering=None): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" ") AS rl " + "NATURAL JOIN events e " "WHERE" " ep.room_id = rl.room_id" " AND (" @@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " "WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" " WHERE receipt_type = 'm.read' AND user_id = ? " @@ -175,11 +179,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) + @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "WHERE ep.stream_ordering > ? " + "ORDER BY ep.stream_ordering ASC " + "LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result is not None else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): @@ -190,6 +213,26 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(result[0] or 0) + @defer.inlineCallbacks + def get_time_of_latest_push_action_by_room_for_user(self, user_id): + """ + Returns only the received_ts of the last notification in each of the + user's rooms, in a dict by room_id + """ + def f(txn): + txn.execute( + "SELECT ep.room_id, MAX(e.received_ts) " + "FROM event_push_actions AS ep " + "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " + "GROUP BY ep.room_id" + ) + return txn.fetchall() + result = yield self.runInteraction( + "get_time_of_latest_push_action_by_room_for_user", f + ) + + defer.returnValue({row[0]: row[1] for row in result}) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 21487724ed..dd58e001dc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -55,6 +55,7 @@ class EventsStore(SQLBaseStore): def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) @@ -427,6 +428,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index e5755c0aea..caef9b59a5 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -230,3 +230,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) \ No newline at end of file diff --git a/synapse/storage/schema/delta/31/events.sql b/synapse/storage/schema/delta/31/events.sql new file mode 100644 index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/31/events.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/31/pusher_throttle.sql b/synapse/storage/schema/delta/31/pusher_throttle.sql new file mode 100644 index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/31/pusher_throttle.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); -- cgit 1.5.1 From f63bd4ff4704c9f7b6e23c76720dbd955a60c058 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Apr 2016 13:02:01 +0100 Subject: Send a rather basic email notif Also pep8 fixes --- synapse/config/emailconfig.py | 62 +++++++++++++++++++++++++++++++++++ synapse/config/homeserver.py | 3 +- synapse/push/emailpusher.py | 32 +++++++++++++----- synapse/push/mailer.py | 48 +++++++++++++++++++++++++++ synapse/storage/event_push_actions.py | 1 - synapse/storage/pusher.py | 2 +- 6 files changed, 136 insertions(+), 12 deletions(-) create mode 100644 synapse/config/emailconfig.py create mode 100644 synapse/push/mailer.py (limited to 'synapse/storage') diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py new file mode 100644 index 0000000000..978826627b --- /dev/null +++ b/synapse/config/emailconfig.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file can't be called email.py because if it is, we cannot: +import email.utils + +from ._base import Config + + +class EmailConfig(Config): + """ + Email Configuration + """ + + def read_config(self, config): + email_config = config.get("email", None) + if email_config: + self.email_enable_notifs = email_config.get("enable_notifs", True) + if ( + "smtp_host" not in email_config or + "smtp_port" not in email_config or + "notif_from" not in email_config + ): + raise RuntimeError( + "You must set smtp_host, smtp_port and notif_from " + "to send email notifications" + ) + + self.email_smtp_host = email_config["smtp_host"] + self.email_smtp_port = email_config["smtp_port"] + self.email_notif_from = email_config["notif_from"] + + # make sure it's valid + parsed = email.utils.parseaddr(self.email_notif_from) + if parsed[1] == '': + raise RuntimeError("Invalid notif_from address") + else: + self.email_enable_notifs = False + self.email_smtp_host = None + self.email_smtp_port = None + self.email_notif_from = None + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Enable sending emails for notification events + #email_config: + # enable_notifs: false + # smtp_host: "localhost" + # smtp_port: 25 + """ diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 9a80ac39ec..fc2445484c 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -31,13 +31,14 @@ from .cas import CasConfig from .password import PasswordConfig from .jwt import JWTConfig from .ldap import LDAPConfig +from .emailconfig import EmailConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig,): + JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): pass diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 74e3a70562..820c8f8467 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,9 +18,10 @@ from twisted.internet import defer, reactor import logging from synapse.util.metrics import Measure -from synapse.util.async import run_on_reactor from synapse.util.logcontext import LoggingContext +from mailer import Mailer + logger = logging.getLogger(__name__) # The amount of time we always wait before ever emailing about a notification @@ -28,11 +29,11 @@ logger = logging.getLogger(__name__) DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000 THROTTLE_START_MS = 2 * 60 * 1000 -THROTTLE_MAX_MS = (2 * 60 * 1000) * (2**11) # ~3 days +THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days # If no event triggers a notification for this long after the previous, # the throttle is released. -THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2**11) # ~3 days +THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days class EmailPusher(object): @@ -59,12 +60,22 @@ class EmailPusher(object): self.processing = False + if self.hs.config.email_enable_notifs: + self.mailer = Mailer( + self.store, + self.hs.config.email_smtp_host, self.hs.config.email_smtp_port, + self.hs.config.email_notif_from, + ) + else: + self.mailer = None + @defer.inlineCallbacks def on_started(self): - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + if self.mailer is not None: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() def on_stop(self): if self.timed_call: @@ -102,6 +113,7 @@ class EmailPusher(object): finally: self.processing = False + @defer.inlineCallbacks def _unsafe_process(self): """ Main logic of the push loop without the wrapper function that sets @@ -241,5 +253,7 @@ class EmailPusher(object): @defer.inlineCallbacks def send_notification(self, push_action): - yield run_on_reactor() - logger.error("sending notif email for user %r", self.user_id) \ No newline at end of file + logger.info("Sending notif email for user %r", self.user_id) + yield self.mailer.send_notification_mail( + self.user_id, self.email, push_action + ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py new file mode 100644 index 0000000000..93d3866ec7 --- /dev/null +++ b/synapse/push/mailer.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import smtplib +import email.utils +import email.mime.multipart +from email.mime.text import MIMEText + + +class Mailer(object): + def __init__(self, store, smtp_host, smtp_port, notif_from): + self.store = store + self.smtp_host = smtp_host + self.smtp_port = smtp_port + self.notif_from = notif_from + + @defer.inlineCallbacks + def send_notification_mail(self, user_id, email_address, push_action): + raw_from = email.utils.parseaddr(self.notif_from)[1] + raw_to = email.utils.parseaddr(email_address)[1] + + if raw_to == '': + raise RuntimeError("Invalid 'to' address") + + plainText = "yo dawg, you got notifications!" + + text_part = MIMEText(plainText, "plain") + text_part['Subject'] = "New Matrix Notifications" + text_part['From'] = self.notif_from + text_part['To'] = email_address + + smtp = smtplib.SMTP(self.smtp_host, self.smtp_port) + smtp.sendmail(raw_from, raw_to, text_part.as_string()) + smtp.quit() \ No newline at end of file diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index ad512b2f07..f2af8bdb36 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -202,7 +202,6 @@ class EventPushActionsStore(SQLBaseStore): result = yield self.runInteraction("get_time_of_last_push_action_before", f) defer.returnValue(result[0] if result is not None else None) - @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index caef9b59a5..5fb47d418a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -256,4 +256,4 @@ class PusherStore(SQLBaseStore): {"pusher": pusher_id, "room_id": room_id}, params, desc="set_throttle_params" - ) \ No newline at end of file + ) -- cgit 1.5.1 From 50ad8005e4b3e8054f990f34d2d7735b09dc8c19 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 19:16:15 +0100 Subject: Put spaces at start of line --- synapse/storage/event_push_actions.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f2af8bdb36..5b9a4ca60d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -155,13 +155,13 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM event_push_actions AS ep " - "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " - "WHERE ep.room_id not in (" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ? " + " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" ") AND ep.user_id = ? AND ep.stream_ordering > ?" ) @@ -190,12 +190,12 @@ class EventPushActionsStore(SQLBaseStore): def get_time_of_last_push_action_before(self, stream_ordering): def f(txn): sql = ( - "SELECT e.received_ts " - "FROM event_push_actions AS ep " - "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " - "WHERE ep.stream_ordering > ? " - "ORDER BY ep.stream_ordering ASC " - "LIMIT 1" + "SELECT e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.stream_ordering > ?" + " ORDER BY ep.stream_ordering ASC" + " LIMIT 1" ) txn.execute(sql, (stream_ordering,)) return txn.fetchone() @@ -220,10 +220,10 @@ class EventPushActionsStore(SQLBaseStore): """ def f(txn): txn.execute( - "SELECT ep.room_id, MAX(e.received_ts) " - "FROM event_push_actions AS ep " - "JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id " - "GROUP BY ep.room_id" + "SELECT ep.room_id, MAX(e.received_ts)" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " GROUP BY ep.room_id" ) return txn.fetchall() result = yield self.runInteraction( -- cgit 1.5.1 From 60f44c098d14754d0159c2ccc2c888e7ca970427 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 19:17:10 +0100 Subject: Remove unnecessary if --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5b9a4ca60d..c6625a7b08 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -200,7 +200,7 @@ class EventPushActionsStore(SQLBaseStore): txn.execute(sql, (stream_ordering,)) return txn.fetchone() result = yield self.runInteraction("get_time_of_last_push_action_before", f) - defer.returnValue(result[0] if result is not None else None) + defer.returnValue(result[0] if result else None) @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): -- cgit 1.5.1 From 8f99cd5996a11bb1cc50dce35542b2591426fdbc Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 19:27:03 +0100 Subject: Oops, actually specify the user id --- synapse/storage/event_push_actions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index c6625a7b08..974ceb3389 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -223,7 +223,9 @@ class EventPushActionsStore(SQLBaseStore): "SELECT ep.room_id, MAX(e.received_ts)" " FROM event_push_actions AS ep" " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " GROUP BY ep.room_id" + " WHERE ep.user_id = ?" + " GROUP BY ep.room_id", + (user_id,) ) return txn.fetchall() result = yield self.runInteraction( -- cgit 1.5.1 From b0a1036d93eaec50839c95bf0dc621826342ea62 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 19:28:56 +0100 Subject: Use explicit join --- synapse/storage/event_push_actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 974ceb3389..b710101b15 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -127,9 +127,9 @@ class EventPushActionsStore(SQLBaseStore): " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl " - "NATURAL JOIN events e " - "WHERE" + ") AS rl" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" " ep.room_id = rl.room_id" " AND (" " ep.topological_ordering > rl.topological_ordering" -- cgit 1.5.1 From 35b7b8e4bccd8cfbe44b2ffee97cdee0a48701ba Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 20:10:34 +0100 Subject: Remove unused function --- synapse/push/emailpusher.py | 4 ---- synapse/storage/event_push_actions.py | 22 ---------------------- 2 files changed, 26 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 8b105b85c8..c10deded06 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -122,10 +122,6 @@ class EmailPusher(object): up logging, measures and guards against multiple instances of it being run. """ - last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user( - self.user_id - ) - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index b710101b15..85290d2a90 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -212,28 +212,6 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(result[0] or 0) - @defer.inlineCallbacks - def get_time_of_latest_push_action_by_room_for_user(self, user_id): - """ - Returns only the received_ts of the last notification in each of the - user's rooms, in a dict by room_id - """ - def f(txn): - txn.execute( - "SELECT ep.room_id, MAX(e.received_ts)" - " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.user_id = ?" - " GROUP BY ep.room_id", - (user_id,) - ) - return txn.fetchall() - result = yield self.runInteraction( - "get_time_of_latest_push_action_by_room_for_user", f - ) - - defer.returnValue({row[0]: row[1] for row in result}) - def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( -- cgit 1.5.1 From 80be39646467c46a52530cd0839746810ad32b62 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 May 2016 13:19:59 +0100 Subject: Correct SQL statement for postgres In standard sql, join binds tighter than comma, so we were joining on the wrong table. Postgres follows the standard (apparently). --- synapse/storage/event_push_actions.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 85290d2a90..6f316f7d24 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -120,14 +120,15 @@ class EventPushActionsStore(SQLBaseStore): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "e.received_ts " - "FROM event_push_actions AS ep, (" + "FROM (" " SELECT room_id, user_id, " " max(topological_ordering) as topological_ordering, " " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl" + ") AS rl," + " event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" " ep.room_id = rl.room_id" -- cgit 1.5.1