diff --git a/CHANGES.rst b/CHANGES.rst
index da241666d6..c8856d9575 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,17 @@
+Changes in synapse v0.19.2 (2017-02-20)
+=======================================
+
+* Fix bug with event visibility check in /context/ API. Thanks to Tokodomo for
+ pointing it out! (PR #1929)
+
+
+Changes in synapse v0.19.1 (2017-02-09)
+=======================================
+
+* Fix bug where state was incorrectly reset in a room when synapse received an
+ event over federation that did not pass auth checks (PR #1892)
+
+
Changes in synapse v0.19.0 (2017-02-04)
=======================================
diff --git a/README.rst b/README.rst
index 77e0b470a3..a73c71c77e 100644
--- a/README.rst
+++ b/README.rst
@@ -332,9 +332,8 @@ https://obs.infoserver.lv/project/monitor/matrix-synapse
ArchLinux
---------
-The quickest way to get up and running with ArchLinux is probably with Ivan
-Shapovalov's AUR package from
-https://aur.archlinux.org/packages/matrix-synapse/, which should pull in all
+The quickest way to get up and running with ArchLinux is probably with the community package
+https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in all
the necessary dependencies.
Alternatively, to install using pip a few changes may be needed as ArchLinux
diff --git a/contrib/cmdclient/console.py b/contrib/cmdclient/console.py
index 8bb03ce66a..4918fa1a9a 100755
--- a/contrib/cmdclient/console.py
+++ b/contrib/cmdclient/console.py
@@ -32,7 +32,7 @@ import urlparse
import nacl.signing
import nacl.encoding
-from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException
+from signedjson.sign import verify_signed_json, SignatureVerifyException
CONFIG_JSON = "cmdclient_config.json"
diff --git a/contrib/example_log_config.yaml b/contrib/example_log_config.yaml
new file mode 100644
index 0000000000..7f7c8ba588
--- /dev/null
+++ b/contrib/example_log_config.yaml
@@ -0,0 +1,48 @@
+# Example log_config file for synapse. To enable, point `log_config` to it in
+# `homeserver.yaml`, and restart synapse.
+#
+# This configuration will produce similar results to the defaults within
+# synapse, but can be edited to give more flexibility.
+
+version: 1
+
+formatters:
+ fmt:
+ format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
+
+filters:
+ context:
+ (): synapse.util.logcontext.LoggingContextFilter
+ request: ""
+
+handlers:
+ # example output to console
+ console:
+ class: logging.StreamHandler
+ filters: [context]
+
+ # example output to file - to enable, edit 'root' config below.
+ file:
+ class: logging.handlers.RotatingFileHandler
+ formatter: fmt
+ filename: /var/log/synapse/homeserver.log
+ maxBytes: 100000000
+ backupCount: 3
+ filters: [context]
+
+
+root:
+ level: INFO
+ handlers: [console] # to use file handler instead, switch to [file]
+
+loggers:
+ synapse:
+ level: INFO
+
+ synapse.storage:
+ level: INFO
+
+ # example of enabling debugging for a component:
+ #
+ # synapse.federation.transport.server:
+ # level: DEBUG
diff --git a/contrib/systemd/synapse.service b/contrib/systemd/synapse.service
index 967a4debfd..92d94b9d58 100644
--- a/contrib/systemd/synapse.service
+++ b/contrib/systemd/synapse.service
@@ -1,5 +1,5 @@
# This assumes that Synapse has been installed as a system package
-# (e.g. https://aur.archlinux.org/packages/matrix-synapse/ for ArchLinux)
+# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
# rather than in a user home directory or similar under virtualenv.
[Unit]
diff --git a/docs/CAPTCHA_SETUP.rst b/docs/CAPTCHA_SETUP.rst
index db621aedfc..19a204d9ce 100644
--- a/docs/CAPTCHA_SETUP.rst
+++ b/docs/CAPTCHA_SETUP.rst
@@ -25,6 +25,5 @@ Configuring IP used for auth
The ReCaptcha API requires that the IP address of the user who solved the
captcha is sent. If the client is connecting through a proxy or load balancer,
it may be required to use the X-Forwarded-For (XFF) header instead of the origin
-IP address. This can be configured as an option on the home server like so::
-
- captcha_ip_origin_is_x_forwarded: true
+IP address. This can be configured using the x_forwarded directive in the
+listeners section of the homeserver.yaml configuration file.
diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst
index ca10799b00..7390ab85c9 100644
--- a/docs/metrics-howto.rst
+++ b/docs/metrics-howto.rst
@@ -1,22 +1,27 @@
How to monitor Synapse metrics using Prometheus
===============================================
-1: Install prometheus:
- Follow instructions at http://prometheus.io/docs/introduction/install/
+1. Install prometheus:
-2: Enable synapse metrics:
- Simply setting a (local) port number will enable it. Pick a port.
- prometheus itself defaults to 9090, so starting just above that for
- locally monitored services seems reasonable. E.g. 9092:
+ Follow instructions at http://prometheus.io/docs/introduction/install/
- Add to homeserver.yaml
+2. Enable synapse metrics:
- metrics_port: 9092
+ Simply setting a (local) port number will enable it. Pick a port.
+ prometheus itself defaults to 9090, so starting just above that for
+ locally monitored services seems reasonable. E.g. 9092:
- Restart synapse
+ Add to homeserver.yaml::
-3: Add a prometheus target for synapse. It needs to set the ``metrics_path``
- to a non-default value::
+ metrics_port: 9092
+
+ Also ensure that ``enable_metrics`` is set to ``True``.
+
+ Restart synapse.
+
+3. Add a prometheus target for synapse.
+
+ It needs to set the ``metrics_path`` to a non-default value::
- job_name: "synapse"
metrics_path: "/_synapse/metrics"
@@ -24,6 +29,11 @@ How to monitor Synapse metrics using Prometheus
- targets:
"my.server.here:9092"
+ If your prometheus is older than 1.5.2, you will need to replace
+ ``static_configs`` in the above with ``target_groups``.
+
+ Restart prometheus.
+
Standard Metric Names
---------------------
diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh
new file mode 100755
index 0000000000..d64b2d2c9d
--- /dev/null
+++ b/jenkins-dendron-haproxy-postgres.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+set -eux
+
+: ${WORKSPACE:="$(pwd)"}
+
+export WORKSPACE
+export PYTHONDONTWRITEBYTECODE=yep
+export SYNAPSE_CACHE_FACTOR=1
+
+export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
+
+./jenkins/prepare_synapse.sh
+./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
+./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
+./dendron/jenkins/build_dendron.sh
+./sytest/jenkins/prep_sytest_for_postgres.sh
+
+./sytest/jenkins/install_and_run.sh \
+ --synapse-directory $WORKSPACE \
+ --dendron $WORKSPACE/dendron/bin/dendron \
+ --haproxy \
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index 55ff31fd18..37ae746f4b 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -17,9 +17,3 @@ export SYNAPSE_CACHE_FACTOR=1
./sytest/jenkins/install_and_run.sh \
--synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \
- --pusher \
- --synchrotron \
- --federation-reader \
- --client-reader \
- --appservice \
- --federation-sender \
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 2cb2eab68b..ea367a1281 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -40,6 +40,7 @@ BOOLEAN_COLUMNS = {
"presence_list": ["accepted"],
"presence_stream": ["currently_active"],
"public_room_list_stream": ["visibility"],
+ "device_lists_outbound_pokes": ["sent"],
}
diff --git a/synapse/__init__.py b/synapse/__init__.py
index d3f445be9c..ff251ce597 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.19.0"
+__version__ = "0.19.2"
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index b3fb408cfd..3f29595256 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -87,6 +87,10 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
+ did_forget = (
+ RoomMemberStore.__dict__["did_forget"]
+ )
+
# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 3c58d2de17..e081840a83 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -95,7 +95,7 @@ class TlsConfig(Config):
# make HTTPS requests to this server will check that the TLS
# certificates returned by this server match one of the fingerprints.
#
- # Synapse automatically adds its the fingerprint of its own certificate
+ # Synapse automatically adds the fingerprint of its own certificate
# to the list. So if federation traffic is handle directly by synapse
# then no modification to the list is required.
#
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index bb3d9258a6..90235ff098 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -303,18 +303,10 @@ class TransactionQueue(object):
try:
self.pending_transactions[destination] = 1
+ # XXX: what's this for?
yield run_on_reactor()
while True:
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_presence = self.pending_presence_by_dest.pop(destination, {})
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
-
- pending_edus.extend(
- self.pending_edus_keyed_by_dest.pop(destination, {}).values()
- )
-
limiter = yield get_retry_limiter(
destination,
self.clock,
@@ -326,6 +318,24 @@ class TransactionQueue(object):
yield self._get_new_device_messages(destination)
)
+ # BEGIN CRITICAL SECTION
+ #
+ # In order to avoid a race condition, we need to make sure that
+ # the following code (from popping the queues up to the point
+ # where we decide if we actually have any pending messages) is
+ # atomic - otherwise new PDUs or EDUs might arrive in the
+ # meantime, but not get sent because we hold the
+ # pending_transactions flag.
+
+ pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_presence = self.pending_presence_by_dest.pop(destination, {})
+ pending_failures = self.pending_failures_by_dest.pop(destination, [])
+
+ pending_edus.extend(
+ self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+ )
+
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
@@ -355,6 +365,8 @@ class TransactionQueue(object):
)
return
+ # END CRITICAL SECTION
+
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 8cb47ac417..ca7137f315 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils
@@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+ stream_ordering = RoomStreamToken.parse_stream_token(
+ from_token.room_key).stream
+
possibly_changed = set(changed)
for room_id in rooms_changed:
- # Fetch the current state at the time.
- stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
-
+ # Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
- except:
- prev_state_ids = {}
+ except errors.StoreError:
+ # we have purged the stream_ordering index since the stream
+ # ordering: treat it the same as a new room
+ event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id)
+ # special-case for an empty prev state: include all members
+ # in the changed list
+ if not event_ids:
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ continue
+
+ # mapping from event_id -> state_dict
+ prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
- if etype == EventTypes.Member:
- prev_event_id = prev_state_ids.get(key, None)
+ if etype != EventTypes.Member:
+ continue
+
+ # check if this member has changed since any of the extremities
+ # at the stream_ordering, and add them to the list if so.
+ for state_dict in prev_state_ids.values():
+ prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
+ break
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 996bfd0e23..ed0fa51e7f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1096,7 +1096,7 @@ class FederationHandler(BaseHandler):
if prev_id != event.event_id:
results[(event.type, event.state_key)] = prev_id
else:
- del results[(event.type, event.state_key)]
+ results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
else:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fdfce2a88c..da610e430f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -531,7 +531,7 @@ class PresenceHandler(object):
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
- states.update({state.user_id: state for state in res})
+ states.update(res)
missing = [user_id for user_id, state in states.items() if not state]
if missing:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7e7671c9a2..99cb7db0db 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -356,7 +356,7 @@ class RoomCreationHandler(BaseHandler):
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit, is_guest):
+ def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -375,12 +375,15 @@ class RoomContextHandler(BaseHandler):
now_token = yield self.hs.get_event_sources().get_current_token()
+ users = yield self.store.get_users_in_room(room_id)
+ is_peeking = user.to_string() not in users
+
def filter_evts(events):
return filter_events_for_client(
self.store,
user.to_string(),
events,
- is_peeking=is_guest
+ is_peeking=is_peeking
)
event = yield self.store.get_event(event_id, get_prev_content=True,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b2806555cf..2052d6d05f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler):
)
membership = member.membership if member else None
- if membership is not None and membership != Membership.LEAVE:
+ if membership is not None and membership not in [
+ Membership.LEAVE, Membership.BAN
+ ]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 2eb325c7c7..c7afd11111 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -218,7 +218,8 @@ class EmailPusher(object):
)
def seconds_until(self, ts_msec):
- return (ts_msec - self.clock.time_msec()) / 1000
+ secs = (ts_msec - self.clock.time_msec()) / 1000
+ return max(secs, 0)
def get_room_throttle_ms(self, room_id):
if room_id in self.throttle_params:
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 18076e0f3b..ab133db872 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -54,7 +54,9 @@ class BaseSlavedStore(SQLBaseStore):
try:
getattr(self, cache_func).invalidate(tuple(keys))
except AttributeError:
- logger.info("Got unexpected cache_func: %r", cache_func)
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
self._cache_id_gen.advance(int(stream["position"]))
return defer.succeed(None)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 735c03c7eb..77c64722c7 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore):
)
get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
+ get_tags_for_room = (
+ DataStore.get_tags_for_room.__func__
+ )
+ get_account_data_for_room = (
+ DataStore.get_account_data_for_room.__func__
+ )
get_updated_tags = DataStore.get_updated_tags.__func__
get_updated_account_data_for_user = (
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d72ff6055c..622b2d8540 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
+ _get_unread_counts_by_receipt_txn = (
+ DataStore._get_unread_counts_by_receipt_txn.__func__
+ )
+ _get_unread_counts_by_pos_txn = (
+ DataStore._get_unread_counts_by_pos_txn.__func__
+ )
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 703f4a49bf..40f6c9a386 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.storage import DataStore
+from synapse.storage.presence import PresenceStore
class SlavedPresenceStore(BaseSlavedStore):
@@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
_get_active_presence = DataStore._get_active_presence.__func__
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
- get_presence_for_users = DataStore.get_presence_for_users.__func__
+ _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
+ get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index efa77b8c51..fceca2edeb 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -87,9 +87,17 @@ class HttpTransactionCache(object):
deferred = fn(*args, **kwargs)
- # We don't add an errback to the raw deferred, so we ask ObservableDeferred
- # to swallow the error. This is fine as the error will still be reported
- # to the observers.
+ # if the request fails with a Twisted failure, remove it
+ # from the transaction map. This is done to ensure that we don't
+ # cache transient errors like rate-limiting errors, etc.
+ def remove_from_map(err):
+ self.transactions.pop(txn_key, None)
+ return err
+ deferred.addErrback(remove_from_map)
+
+ # We don't add any other errbacks to the raw deferred, so we ask
+ # ObservableDeferred to swallow the error. This is fine as the error will
+ # still be reported to the observers.
observable = ObservableDeferred(deferred, consumeErrors=True)
self.transactions[txn_key] = (observable, self.clock.time_msec())
return observable.observe()
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 2ebf5e59a0..90242a6bac 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -505,7 +505,6 @@ class RoomEventContext(ClientV1RestServlet):
room_id,
event_id,
limit,
- requester.is_guest,
)
if not results:
@@ -609,6 +608,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Missing user_id key.")
target = UserID.from_string(content["user_id"])
+ event_content = None
+ if 'reason' in content and membership_action in ['kick', 'ban']:
+ event_content = {'reason': content['reason']}
+
yield self.handlers.room_member_handler.update_membership(
requester=requester,
target=target,
@@ -616,6 +619,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
action=membership_action,
txn_id=txn_id,
third_party_signed=content.get("third_party_signed", None),
+ content=event_content,
)
defer.returnValue((200, {}))
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 3cbeca503c..481ffee200 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -240,6 +240,9 @@ class MediaRepository(object):
if t_method == "crop":
t_len = thumbnailer.crop(t_path, t_width, t_height, t_type)
elif t_method == "scale":
+ t_width, t_height = thumbnailer.aspect(t_width, t_height)
+ t_width = min(m_width, t_width)
+ t_height = min(m_height, t_height)
t_len = thumbnailer.scale(t_path, t_width, t_height, t_type)
else:
t_len = None
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b0dc391190..4410cd9e62 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,6 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
-from synapse.util.caches import intern_dict
from synapse.storage.engines import PostgresEngine
import synapse.metrics
@@ -80,7 +79,13 @@ class LoggingTransaction(object):
def executemany(self, sql, *args):
self._do_execute(self.txn.executemany, sql, *args)
+ def _make_sql_one_line(self, sql):
+ "Strip newlines out of SQL so that the loggers in the DB are on one line"
+ return " ".join(l.strip() for l in sql.splitlines() if l.strip())
+
def _do_execute(self, func, sql, *args):
+ sql = self._make_sql_one_line(sql)
+
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
@@ -350,9 +355,9 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
- col_headers = list(column[0] for column in cursor.description)
+ col_headers = list(intern(column[0]) for column in cursor.description)
results = list(
- intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
+ dict(zip(col_headers, row)) for row in cursor.fetchall()
)
return results
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8e17800364..7b5903bf8e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -19,6 +19,8 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
logger = logging.getLogger(__name__)
@@ -144,6 +146,7 @@ class DeviceStore(SQLBaseStore):
defer.returnValue({d["device_id"]: d for d in devices})
+ @cached(max_entries=10000)
def get_device_list_last_stream_id_for_remote(self, user_id):
"""Get the last stream_id we got for a user. May be None if we haven't
got any information for them.
@@ -156,16 +159,36 @@ class DeviceStore(SQLBaseStore):
allow_none=True,
)
+ @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote",
+ list_name="user_ids", inlineCallbacks=True)
+ def get_device_list_last_stream_id_for_remotes(self, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table="device_lists_remote_extremeties",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id", "stream_id",),
+ desc="get_user_devices_from_cache",
+ )
+
+ results = {user_id: None for user_id in user_ids}
+ results.update({
+ row["user_id"]: row["stream_id"] for row in rows
+ })
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
def mark_remote_user_device_list_as_unsubscribed(self, user_id):
"""Mark that we no longer track device lists for remote user.
"""
- return self._simple_delete(
+ yield self._simple_delete(
table="device_lists_remote_extremeties",
keyvalues={
"user_id": user_id,
},
desc="mark_remote_user_device_list_as_unsubscribed",
)
+ self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
def update_remote_device_list_cache_entry(self, user_id, device_id, content,
stream_id):
@@ -191,6 +214,12 @@ class DeviceStore(SQLBaseStore):
}
)
+ txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
+ txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+ txn.call_after(
+ self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
+ )
+
self._simple_upsert_txn(
txn,
table="device_lists_remote_extremeties",
@@ -234,6 +263,12 @@ class DeviceStore(SQLBaseStore):
]
)
+ txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+ txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
+ txn.call_after(
+ self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
+ )
+
self._simple_upsert_txn(
txn,
table="device_lists_remote_extremeties",
@@ -320,6 +355,7 @@ class DeviceStore(SQLBaseStore):
return (now_stream_id, results)
+ @defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@@ -332,27 +368,11 @@ class DeviceStore(SQLBaseStore):
a set of user_ids and results_map is a mapping of
user_id -> device_id -> device_info
"""
- return self.runInteraction(
- "get_user_devices_from_cache", self._get_user_devices_from_cache_txn,
- query_list,
+ user_ids = set(user_id for user_id, _ in query_list)
+ user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
+ user_ids_in_cache = set(
+ user_id for user_id, stream_id in user_map.items() if stream_id
)
-
- def _get_user_devices_from_cache_txn(self, txn, query_list):
- user_ids = {user_id for user_id, _ in query_list}
-
- user_ids_in_cache = set()
- for user_id in user_ids:
- stream_ids = self._simple_select_onecol_txn(
- txn,
- table="device_lists_remote_extremeties",
- keyvalues={
- "user_id": user_id,
- },
- retcol="stream_id",
- )
- if stream_ids:
- user_ids_in_cache.add(user_id)
-
user_ids_not_in_cache = user_ids - user_ids_in_cache
results = {}
@@ -361,32 +381,40 @@ class DeviceStore(SQLBaseStore):
continue
if device_id:
- content = self._simple_select_one_onecol_txn(
- txn,
- table="device_lists_remote_cache",
- keyvalues={
- "user_id": user_id,
- "device_id": device_id,
- },
- retcol="content",
- )
- results.setdefault(user_id, {})[device_id] = json.loads(content)
+ device = yield self._get_cached_user_device(user_id, device_id)
+ results.setdefault(user_id, {})[device_id] = device
else:
- devices = self._simple_select_list_txn(
- txn,
- table="device_lists_remote_cache",
- keyvalues={
- "user_id": user_id,
- },
- retcols=("device_id", "content"),
- )
- results[user_id] = {
- device["device_id"]: json.loads(device["content"])
- for device in devices
- }
- user_ids_in_cache.discard(user_id)
+ results[user_id] = yield self._get_cached_devices_for_user(user_id)
- return user_ids_not_in_cache, results
+ defer.returnValue((user_ids_not_in_cache, results))
+
+ @cachedInlineCallbacks(num_args=2, tree=True)
+ def _get_cached_user_device(self, user_id, device_id):
+ content = yield self._simple_select_one_onecol(
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ retcol="content",
+ desc="_get_cached_user_device",
+ )
+ defer.returnValue(json.loads(content))
+
+ @cachedInlineCallbacks()
+ def _get_cached_devices_for_user(self, user_id):
+ devices = yield self._simple_select_list(
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ },
+ retcols=("device_id", "content"),
+ desc="_get_cached_devices_for_user",
+ )
+ defer.returnValue({
+ device["device_id"]: json.loads(device["content"])
+ for device in devices
+ })
def get_devices_with_keys_by_user(self, user_id):
"""Get all devices (with any device keys) for a user
@@ -489,7 +517,7 @@ class DeviceStore(SQLBaseStore):
WHERE stream_id > ?
"""
return self._execute(
- "get_users_and_hosts_device_list", None,
+ "get_all_device_list_changes_for_remotes", None,
sql, from_key,
)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2040e022fa..b9f1365f92 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
query_clause = "user_id = ?"
query_params.append(user_id)
- if device_id:
+ if device_id is not None:
query_clause += " AND device_id = ?"
query_params.append(device_id)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ee88c61954..256e50dc20 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
)
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+ """For a given room_id and stream_ordering, return the forward
+ extremeties of the room at that point in "time".
+
+ Throws a StoreError if we have since purged the index for
+ stream_orderings from that point.
+
+ Args:
+ room_id (str):
+ stream_ordering (int):
+
+ Returns:
+ deferred, which resolves to a list of event_ids
+ """
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
- # try and pin to a stream_ordering from before a restart
+ # stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)
+ # provided the last_change is recent enough, we now clamp the requested
+ # stream_ordering to it.
if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 7de3e8c58c..14543b4269 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -15,6 +15,7 @@
from ._base import SQLBaseStore
from twisted.internet import defer
+from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
@@ -25,11 +26,46 @@ import ujson as json
logger = logging.getLogger(__name__)
+DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
+DEFAULT_HIGHLIGHT_ACTION = [
+ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
+]
+
+
+def _serialize_action(actions, is_highlight):
+ """Custom serializer for actions. This allows us to "compress" common actions.
+
+ We use the fact that most users have the same actions for notifs (and for
+ highlights).
+ We store these default actions as the empty string rather than the full JSON.
+ Since the empty string isn't valid JSON there is no risk of this clashing with
+ any real JSON actions
+ """
+ if is_highlight:
+ if actions == DEFAULT_HIGHLIGHT_ACTION:
+ return "" # We use empty string as the column is non-NULL
+ else:
+ if actions == DEFAULT_NOTIF_ACTION:
+ return ""
+ return json.dumps(actions)
+
+
+def _deserialize_action(actions, is_highlight):
+ """Custom deserializer for actions. This allows us to "compress" common actions
+ """
+ if actions:
+ return json.loads(actions)
+
+ if is_highlight:
+ return DEFAULT_HIGHLIGHT_ACTION
+ else:
+ return DEFAULT_NOTIF_ACTION
+
+
class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, hs):
- self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)
self.register_background_index_update(
@@ -47,6 +83,11 @@ class EventPushActionsStore(SQLBaseStore):
where_clause="highlight=1"
)
+ self._doing_notif_rotation = False
+ self._rotate_notif_loop = self._clock.looping_call(
+ self._rotate_notifs, 30 * 60 * 1000
+ )
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -55,15 +96,17 @@ class EventPushActionsStore(SQLBaseStore):
"""
values = []
for uid, actions in tuples:
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+
values.append({
'room_id': event.room_id,
'event_id': event.event_id,
'user_id': uid,
- 'actions': json.dumps(actions),
+ 'actions': _serialize_action(actions, is_highlight),
'stream_ordering': event.internal_metadata.stream_ordering,
'topological_ordering': event.depth,
'notif': 1,
- 'highlight': 1 if _action_has_highlight(actions) else 0,
+ 'highlight': is_highlight,
})
for uid, __ in tuples:
@@ -77,66 +120,83 @@ class EventPushActionsStore(SQLBaseStore):
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
- def _get_unread_event_push_actions_by_room(txn):
- sql = (
- "SELECT stream_ordering, topological_ordering"
- " FROM events"
- " WHERE room_id = ? AND event_id = ?"
- )
- txn.execute(
- sql, (room_id, last_read_event_id)
- )
- results = txn.fetchall()
- if len(results) == 0:
- return {"notify_count": 0, "highlight_count": 0}
-
- stream_ordering = results[0][0]
- topological_ordering = results[0][1]
- token = RoomStreamToken(
- topological_ordering, stream_ordering
- )
-
- # First get number of notifications.
- # We don't need to put a notif=1 clause as all rows always have
- # notif=1
- sql = (
- "SELECT count(*)"
- " FROM event_push_actions ea"
- " WHERE"
- " user_id = ?"
- " AND room_id = ?"
- " AND %s"
- ) % (lower_bound(token, self.database_engine, inclusive=False),)
-
- txn.execute(sql, (user_id, room_id))
- row = txn.fetchone()
- notify_count = row[0] if row else 0
+ ret = yield self.runInteraction(
+ "get_unread_event_push_actions_by_room",
+ self._get_unread_counts_by_receipt_txn,
+ room_id, user_id, last_read_event_id
+ )
+ defer.returnValue(ret)
- # Now get the number of highlights
- sql = (
- "SELECT count(*)"
- " FROM event_push_actions ea"
- " WHERE"
- " highlight = 1"
- " AND user_id = ?"
- " AND room_id = ?"
- " AND %s"
- ) % (lower_bound(token, self.database_engine, inclusive=False),)
+ def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
+ last_read_event_id):
+ sql = (
+ "SELECT stream_ordering, topological_ordering"
+ " FROM events"
+ " WHERE room_id = ? AND event_id = ?"
+ )
+ txn.execute(
+ sql, (room_id, last_read_event_id)
+ )
+ results = txn.fetchall()
+ if len(results) == 0:
+ return {"notify_count": 0, "highlight_count": 0}
- txn.execute(sql, (user_id, room_id))
- row = txn.fetchone()
- highlight_count = row[0] if row else 0
+ stream_ordering = results[0][0]
+ topological_ordering = results[0][1]
- return {
- "notify_count": notify_count,
- "highlight_count": highlight_count,
- }
+ return self._get_unread_counts_by_pos_txn(
+ txn, room_id, user_id, topological_ordering, stream_ordering
+ )
- ret = yield self.runInteraction(
- "get_unread_event_push_actions_by_room",
- _get_unread_event_push_actions_by_room
+ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
+ stream_ordering):
+ token = RoomStreamToken(
+ topological_ordering, stream_ordering
)
- defer.returnValue(ret)
+
+ # First get number of notifications.
+ # We don't need to put a notif=1 clause as all rows always have
+ # notif=1
+ sql = (
+ "SELECT count(*)"
+ " FROM event_push_actions ea"
+ " WHERE"
+ " user_id = ?"
+ " AND room_id = ?"
+ " AND %s"
+ ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+ txn.execute(sql, (user_id, room_id))
+ row = txn.fetchone()
+ notify_count = row[0] if row else 0
+
+ txn.execute("""
+ SELECT notif_count FROM event_push_summary
+ WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
+ """, (room_id, user_id, stream_ordering,))
+ rows = txn.fetchall()
+ if rows:
+ notify_count += rows[0][0]
+
+ # Now get the number of highlights
+ sql = (
+ "SELECT count(*)"
+ " FROM event_push_actions ea"
+ " WHERE"
+ " highlight = 1"
+ " AND user_id = ?"
+ " AND room_id = ?"
+ " AND %s"
+ ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+ txn.execute(sql, (user_id, room_id))
+ row = txn.fetchone()
+ highlight_count = row[0] if row else 0
+
+ return {
+ "notify_count": notify_count,
+ "highlight_count": highlight_count,
+ }
@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
@@ -176,7 +236,8 @@ class EventPushActionsStore(SQLBaseStore):
# find rooms that have a read receipt in them and return the next
# push actions
sql = (
- "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+ " ep.highlight "
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
@@ -217,7 +278,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " e.received_ts"
+ " ep.highlight "
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
@@ -246,7 +307,7 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0],
"room_id": row[1],
"stream_ordering": row[2],
- "actions": json.loads(row[3]),
+ "actions": _deserialize_action(row[3], row[4]),
} for row in after_read_receipt + no_read_receipt
]
@@ -285,7 +346,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " e.received_ts"
+ " ep.highlight, e.received_ts"
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
@@ -327,7 +388,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
- " e.received_ts"
+ " ep.highlight, e.received_ts"
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
@@ -357,8 +418,8 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0],
"room_id": row[1],
"stream_ordering": row[2],
- "actions": json.loads(row[3]),
- "received_ts": row[4],
+ "actions": _deserialize_action(row[3], row[4]),
+ "received_ts": row[5],
} for row in after_read_receipt + no_read_receipt
]
@@ -392,7 +453,7 @@ class EventPushActionsStore(SQLBaseStore):
sql = (
"SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering,"
- " epa.actions, epa.profile_tag, e.received_ts"
+ " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
" FROM event_push_actions epa, events e"
" WHERE epa.event_id = e.event_id"
" AND epa.user_id = ? %s"
@@ -407,7 +468,7 @@ class EventPushActionsStore(SQLBaseStore):
"get_push_actions_for_user", f
)
for pa in push_actions:
- pa["actions"] = json.loads(pa["actions"])
+ pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
defer.returnValue(push_actions)
@defer.inlineCallbacks
@@ -448,10 +509,14 @@ class EventPushActionsStore(SQLBaseStore):
)
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering):
+ topological_ordering, stream_ordering):
"""
- Purges old, stale push actions for a user and room before a given
- topological_ordering
+ Purges old push actions for a user and room before a given
+ topological_ordering.
+
+ We however keep a months worth of highlighted notifications, so that
+ users can still get a list of recent highlights.
+
Args:
txn: The transcation
room_id: Room ID to delete from
@@ -475,10 +540,16 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
- " topological_ordering < ? AND stream_ordering < ?",
+ " topological_ordering <= ?"
+ " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)
+ txn.execute("""
+ DELETE FROM event_push_summary
+ WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
+ """, (room_id, user_id, stream_ordering))
+
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
@@ -495,6 +566,14 @@ class EventPushActionsStore(SQLBaseStore):
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
+ logger.info("Searching for stream ordering 1 day ago")
+ self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 day ago: it's %d",
+ self.stream_ordering_day_ago
+ )
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
@@ -534,6 +613,131 @@ class EventPushActionsStore(SQLBaseStore):
return range_end
+ @defer.inlineCallbacks
+ def _rotate_notifs(self):
+ if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
+ return
+ self._doing_notif_rotation = True
+
+ try:
+ while True:
+ logger.info("Rotating notifications")
+
+ caught_up = yield self.runInteraction(
+ "_rotate_notifs",
+ self._rotate_notifs_txn
+ )
+ if caught_up:
+ break
+ yield sleep(5)
+ finally:
+ self._doing_notif_rotation = False
+
+ def _rotate_notifs_txn(self, txn):
+ """Archives older notifications into event_push_summary. Returns whether
+ the archiving process has caught up or not.
+ """
+
+ old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
+ )
+
+ # We don't to try and rotate millions of rows at once, so we cap the
+ # maximum stream ordering we'll rotate before.
+ txn.execute("""
+ SELECT stream_ordering FROM event_push_actions
+ WHERE stream_ordering > ?
+ ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
+ """, (old_rotate_stream_ordering,))
+ stream_row = txn.fetchone()
+ if stream_row:
+ offset_stream_ordering, = stream_row
+ rotate_to_stream_ordering = min(
+ self.stream_ordering_day_ago, offset_stream_ordering
+ )
+ caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
+ else:
+ rotate_to_stream_ordering = self.stream_ordering_day_ago
+ caught_up = True
+
+ logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
+
+ self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
+
+ # We have caught up iff we were limited by `stream_ordering_day_ago`
+ return caught_up
+
+ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
+ old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
+ )
+
+ # Calculate the new counts that should be upserted into event_push_summary
+ sql = """
+ SELECT user_id, room_id,
+ coalesce(old.notif_count, 0) + upd.notif_count,
+ upd.stream_ordering,
+ old.user_id
+ FROM (
+ SELECT user_id, room_id, count(*) as notif_count,
+ max(stream_ordering) as stream_ordering
+ FROM event_push_actions
+ WHERE ? <= stream_ordering AND stream_ordering < ?
+ AND highlight = 0
+ GROUP BY user_id, room_id
+ ) AS upd
+ LEFT JOIN event_push_summary AS old USING (user_id, room_id)
+ """
+
+ txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
+ rows = txn.fetchall()
+
+ logger.info("Rotating notifications, handling %d rows", len(rows))
+
+ # If the `old.user_id` above is NULL then we know there isn't already an
+ # entry in the table, so we simply insert it. Otherwise we update the
+ # existing table.
+ self._simple_insert_many_txn(
+ txn,
+ table="event_push_summary",
+ values=[
+ {
+ "user_id": row[0],
+ "room_id": row[1],
+ "notif_count": row[2],
+ "stream_ordering": row[3],
+ }
+ for row in rows if row[4] is None
+ ]
+ )
+
+ txn.executemany(
+ """
+ UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
+ WHERE user_id = ? AND room_id = ?
+ """,
+ ((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
+ )
+
+ txn.execute(
+ "DELETE FROM event_push_actions"
+ " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
+ (old_rotate_stream_ordering, rotate_to_stream_ordering,)
+ )
+
+ logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
+
+ txn.execute(
+ "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
+ (rotate_to_stream_ordering,)
+ )
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8659f605a5..db01eb6d14 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -302,7 +302,7 @@ class EventsStore(SQLBaseStore):
room_id
)
new_latest_event_ids = yield self._calculate_new_extremeties(
- room_id, [ev for ev, _ in ev_ctx_rm]
+ room_id, ev_ctx_rm, latest_event_ids
)
if new_latest_event_ids == set(latest_event_ids):
@@ -311,6 +311,21 @@ class EventsStore(SQLBaseStore):
new_forward_extremeties[room_id] = new_latest_event_ids
+ len_1 = (
+ len(latest_event_ids) == 1
+ and len(new_latest_event_ids) == 1
+ )
+ if len_1:
+ all_single_prev_not_state = all(
+ len(event.prev_events) == 1
+ and not event.is_state()
+ for event, ctx in ev_ctx_rm
+ )
+ # Don't bother calculating state if they're just
+ # a long chain of single ancestor non-state events.
+ if all_single_prev_not_state:
+ continue
+
state = yield self._calculate_state_delta(
room_id, ev_ctx_rm, new_latest_event_ids
)
@@ -329,27 +344,24 @@ class EventsStore(SQLBaseStore):
persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
- def _calculate_new_extremeties(self, room_id, events):
+ def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
- latest_event_ids = yield self.get_latest_event_ids_in_room(
- room_id
- )
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
- event.event_id for event in events
- if not event.internal_metadata.is_outlier()
+ event.event_id for event, ctx in event_contexts
+ if not event.internal_metadata.is_outlier() and not ctx.rejected
)
# Now remove all events that are referenced by the to-be-added events
new_latest_event_ids.difference_update(
e_id
- for event in events
+ for event, ctx in event_contexts
for e_id, _ in event.prev_events
- if not event.internal_metadata.is_outlier()
+ if not event.internal_metadata.is_outlier() and not ctx.rejected
)
# And finally remove any events that are referenced by previously added
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b357f22be7..ed84db6b4b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 40
+SCHEMA_VERSION = 41
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 7460f98a1f..4d1590d2b4 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -15,7 +15,7 @@
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from collections import namedtuple
from twisted.internet import defer
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
self.presence_stream_cache.entity_has_changed,
state.user_id, stream_id,
)
+ self._invalidate_cache_and_stream(
+ txn, self._get_presence_for_user, (state.user_id,)
+ )
# Actually insert new rows
self._simple_insert_many_txn(
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
"get_all_presence_updates", get_all_presence_updates_txn
)
- @defer.inlineCallbacks
+ @cached()
+ def _get_presence_for_user(self, user_id):
+ raise NotImplementedError()
+
+ @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
+ num_args=1, inlineCallbacks=True)
def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
table="presence_stream",
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
for row in rows:
row["currently_active"] = bool(row["currently_active"])
- defer.returnValue([UserPresenceState(**row) for row in rows])
+ defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index f72d15f5ed..5cf41501ea 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
+ stream_ordering=stream_ordering,
)
return True
diff --git a/synapse/storage/schema/delta/40/event_push_summary.sql b/synapse/storage/schema/delta/40/event_push_summary.sql
new file mode 100644
index 0000000000..3918f0b794
--- /dev/null
+++ b/synapse/storage/schema/delta/40/event_push_summary.sql
@@ -0,0 +1,37 @@
+/* Copyright 2017 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Aggregate of old notification counts that have been deleted out of the
+-- main event_push_actions table. This count does not include those that were
+-- highlights, as they remain in the event_push_actions table.
+CREATE TABLE event_push_summary (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ notif_count BIGINT NOT NULL,
+ stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
+
+
+-- The stream ordering up to which we have aggregated the event_push_actions
+-- table into event_push_summary
+CREATE TABLE event_push_summary_stream_ordering (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_ordering BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
diff --git a/synapse/storage/schema/delta/40/pushers.sql b/synapse/storage/schema/delta/40/pushers.sql
new file mode 100644
index 0000000000..054a223f14
--- /dev/null
+++ b/synapse/storage/schema/delta/40/pushers.sql
@@ -0,0 +1,39 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS pushers2 (
+ id BIGINT PRIMARY KEY,
+ user_name TEXT NOT NULL,
+ access_token BIGINT DEFAULT NULL,
+ profile_tag TEXT NOT NULL,
+ kind TEXT NOT NULL,
+ app_id TEXT NOT NULL,
+ app_display_name TEXT NOT NULL,
+ device_display_name TEXT NOT NULL,
+ pushkey TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ lang TEXT,
+ data TEXT,
+ last_stream_ordering INTEGER,
+ last_success BIGINT,
+ failing_since BIGINT,
+ UNIQUE (app_id, pushkey, user_name)
+);
+
+INSERT INTO pushers2 SELECT * FROM PUSHERS;
+
+DROP TABLE PUSHERS;
+
+ALTER TABLE pushers2 RENAME TO pushers;
diff --git a/synapse/storage/schema/delta/41/device_outbound_index.sql b/synapse/storage/schema/delta/41/device_outbound_index.sql
new file mode 100644
index 0000000000..62f0b9892b
--- /dev/null
+++ b/synapse/storage/schema/delta/41/device_outbound_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes(stream_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b3800eb6a..84482d8285 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_ids_for_events(self, event_ids, types):
+ def get_state_ids_for_events(self, event_ids, types=None):
+ """
+ Get the state dicts corresponding to a list of events
+
+ Args:
+ event_ids(list(str)): events whose state should be returned
+ types(list[(str, str)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. May be None, which
+ matches any key
+
+ Returns:
+ A deferred dict from event_id -> (type, state_key) -> state_event
+ """
event_to_groups = yield self._get_state_group_for_events(
event_ids,
)
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index e9044afa2e..3135488353 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -17,9 +17,15 @@ from twisted.internet import defer
import tests.unittest
import tests.utils
+from mock import Mock
USER_ID = "@user:example.com"
+PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
+HIGHLIGHT = [
+ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
+]
+
class EventPushActionsStoreTestCase(tests.unittest.TestCase):
@@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
yield self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20
)
+
+ @defer.inlineCallbacks
+ def test_count_aggregation(self):
+ room_id = "!foo:example.com"
+ user_id = "@user1235:example.com"
+
+ @defer.inlineCallbacks
+ def _assert_counts(noitf_count, highlight_count):
+ counts = yield self.store.runInteraction(
+ "", self.store._get_unread_counts_by_pos_txn,
+ room_id, user_id, 0, 0
+ )
+ self.assertEquals(
+ counts,
+ {"notify_count": noitf_count, "highlight_count": highlight_count}
+ )
+
+ def _inject_actions(stream, action):
+ event = Mock()
+ event.room_id = room_id
+ event.event_id = "$test:example.com"
+ event.internal_metadata.stream_ordering = stream
+ event.depth = stream
+
+ tuples = [(user_id, action)]
+
+ return self.store.runInteraction(
+ "", self.store._set_push_actions_for_event_and_users_txn,
+ event, tuples
+ )
+
+ def _rotate(stream):
+ return self.store.runInteraction(
+ "", self.store._rotate_notifs_before_txn, stream
+ )
+
+ def _mark_read(stream, depth):
+ return self.store.runInteraction(
+ "", self.store._remove_old_push_actions_before_txn,
+ room_id, user_id, depth, stream
+ )
+
+ yield _assert_counts(0, 0)
+ yield _inject_actions(1, PlAIN_NOTIF)
+ yield _assert_counts(1, 0)
+ yield _rotate(2)
+ yield _assert_counts(1, 0)
+
+ yield _inject_actions(3, PlAIN_NOTIF)
+ yield _assert_counts(2, 0)
+ yield _rotate(4)
+ yield _assert_counts(2, 0)
+
+ yield _inject_actions(5, PlAIN_NOTIF)
+ yield _mark_read(3, 3)
+ yield _assert_counts(1, 0)
+
+ yield _mark_read(5, 5)
+ yield _assert_counts(0, 0)
+
+ yield _inject_actions(6, PlAIN_NOTIF)
+ yield _rotate(7)
+
+ yield self.store._simple_delete(
+ table="event_push_actions",
+ keyvalues={"1": 1},
+ desc="",
+ )
+
+ yield _assert_counts(1, 0)
+
+ yield _mark_read(7, 7)
+ yield _assert_counts(0, 0)
+
+ yield _inject_actions(8, HIGHLIGHT)
+ yield _assert_counts(1, 1)
+ yield _rotate(9)
+ yield _assert_counts(1, 1)
+ yield _rotate(10)
+ yield _assert_counts(1, 1)
|