diff options
30 files changed, 737 insertions, 166 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 22aa7cb9b4..c8856d9575 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,10 @@ +Changes in synapse v0.19.2 (2017-02-20) +======================================= + +* Fix bug with event visibility check in /context/ API. Thanks to Tokodomo for + pointing it out! (PR #1929) + + Changes in synapse v0.19.1 (2017-02-09) ======================================= diff --git a/README.rst b/README.rst index 77e0b470a3..a73c71c77e 100644 --- a/README.rst +++ b/README.rst @@ -332,9 +332,8 @@ https://obs.infoserver.lv/project/monitor/matrix-synapse ArchLinux --------- -The quickest way to get up and running with ArchLinux is probably with Ivan -Shapovalov's AUR package from -https://aur.archlinux.org/packages/matrix-synapse/, which should pull in all +The quickest way to get up and running with ArchLinux is probably with the community package +https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in all the necessary dependencies. Alternatively, to install using pip a few changes may be needed as ArchLinux diff --git a/contrib/cmdclient/console.py b/contrib/cmdclient/console.py index 8bb03ce66a..4918fa1a9a 100755 --- a/contrib/cmdclient/console.py +++ b/contrib/cmdclient/console.py @@ -32,7 +32,7 @@ import urlparse import nacl.signing import nacl.encoding -from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException +from signedjson.sign import verify_signed_json, SignatureVerifyException CONFIG_JSON = "cmdclient_config.json" diff --git a/contrib/example_log_config.yaml b/contrib/example_log_config.yaml new file mode 100644 index 0000000000..7f7c8ba588 --- /dev/null +++ b/contrib/example_log_config.yaml @@ -0,0 +1,48 @@ +# Example log_config file for synapse. To enable, point `log_config` to it in +# `homeserver.yaml`, and restart synapse. +# +# This configuration will produce similar results to the defaults within +# synapse, but can be edited to give more flexibility. + +version: 1 + +formatters: + fmt: + format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s' + +filters: + context: + (): synapse.util.logcontext.LoggingContextFilter + request: "" + +handlers: + # example output to console + console: + class: logging.StreamHandler + filters: [context] + + # example output to file - to enable, edit 'root' config below. + file: + class: logging.handlers.RotatingFileHandler + formatter: fmt + filename: /var/log/synapse/homeserver.log + maxBytes: 100000000 + backupCount: 3 + filters: [context] + + +root: + level: INFO + handlers: [console] # to use file handler instead, switch to [file] + +loggers: + synapse: + level: INFO + + synapse.storage: + level: INFO + + # example of enabling debugging for a component: + # + # synapse.federation.transport.server: + # level: DEBUG diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index ca10799b00..7390ab85c9 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -1,22 +1,27 @@ How to monitor Synapse metrics using Prometheus =============================================== -1: Install prometheus: - Follow instructions at http://prometheus.io/docs/introduction/install/ +1. Install prometheus: -2: Enable synapse metrics: - Simply setting a (local) port number will enable it. Pick a port. - prometheus itself defaults to 9090, so starting just above that for - locally monitored services seems reasonable. E.g. 9092: + Follow instructions at http://prometheus.io/docs/introduction/install/ - Add to homeserver.yaml +2. Enable synapse metrics: - metrics_port: 9092 + Simply setting a (local) port number will enable it. Pick a port. + prometheus itself defaults to 9090, so starting just above that for + locally monitored services seems reasonable. E.g. 9092: - Restart synapse + Add to homeserver.yaml:: -3: Add a prometheus target for synapse. It needs to set the ``metrics_path`` - to a non-default value:: + metrics_port: 9092 + + Also ensure that ``enable_metrics`` is set to ``True``. + + Restart synapse. + +3. Add a prometheus target for synapse. + + It needs to set the ``metrics_path`` to a non-default value:: - job_name: "synapse" metrics_path: "/_synapse/metrics" @@ -24,6 +29,11 @@ How to monitor Synapse metrics using Prometheus - targets: "my.server.here:9092" + If your prometheus is older than 1.5.2, you will need to replace + ``static_configs`` in the above with ``target_groups``. + + Restart prometheus. + Standard Metric Names --------------------- diff --git a/synapse/__init__.py b/synapse/__init__.py index da8ef90a77..ff251ce597 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.19.1" +__version__ = "0.19.2" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index b3fb408cfd..3f29595256 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -87,6 +87,10 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + did_forget = ( + RoomMemberStore.__dict__["did_forget"] + ) + # XXX: This is a bit broken because we don't persist the accepted list in a # way that can be replicated. This means that we don't have a way to # invalidate the cache correctly. diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 3c58d2de17..e081840a83 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -95,7 +95,7 @@ class TlsConfig(Config): # make HTTPS requests to this server will check that the TLS # certificates returned by this server match one of the fingerprints. # - # Synapse automatically adds its the fingerprint of its own certificate + # Synapse automatically adds the fingerprint of its own certificate # to the list. So if federation traffic is handle directly by synapse # then no modification to the list is required. # diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb3d9258a6..90235ff098 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -303,18 +303,10 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + # XXX: what's this for? yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - limiter = yield get_retry_limiter( destination, self.clock, @@ -326,6 +318,24 @@ class TransactionQueue(object): yield self._get_new_device_messages(destination) ) + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # pending_transactions flag. + + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( @@ -355,6 +365,8 @@ class TransactionQueue(object): ) return + # END CRITICAL SECTION + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, limiter=limiter, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 8cb47ac417..ca7137f315 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils @@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler): # Then work out if any users have since joined rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key).stream + possibly_changed = set(changed) for room_id in rooms_changed: - # Fetch the current state at the time. - stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) - + # Fetch the current state at the time. try: event_ids = yield self.store.get_forward_extremeties_for_room( room_id, stream_ordering=stream_ordering ) - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - except: - prev_state_ids = {} + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] current_state_ids = yield self.state.get_current_state_ids(room_id) + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. for key, event_id in current_state_ids.iteritems(): etype, state_key = key - if etype == EventTypes.Member: - prev_event_id = prev_state_ids.get(key, None) + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in prev_state_ids.values(): + prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: possibly_changed.add(state_key) + break users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fdfce2a88c..da610e430f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -531,7 +531,7 @@ class PresenceHandler(object): # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) - states.update({state.user_id: state for state in res}) + states.update(res) missing = [user_id for user_id, state in states.items() if not state] if missing: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7e7671c9a2..99cb7db0db 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -356,7 +356,7 @@ class RoomCreationHandler(BaseHandler): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit, is_guest): + def get_event_context(self, user, room_id, event_id, limit): """Retrieves events, pagination tokens and state around a given event in a room. @@ -375,12 +375,15 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() + users = yield self.store.get_users_in_room(room_id) + is_peeking = user.to_string() not in users + def filter_evts(events): return filter_events_for_client( self.store, user.to_string(), events, - is_peeking=is_guest + is_peeking=is_peeking ) event = yield self.store.get_event(event_id, get_prev_content=True, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b2806555cf..2052d6d05f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler): ) membership = member.membership if member else None - if membership is not None and membership != Membership.LEAVE: + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: raise SynapseError(400, "User %s in room %s" % ( user_id, room_id )) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 2eb325c7c7..c7afd11111 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -218,7 +218,8 @@ class EmailPusher(object): ) def seconds_until(self, ts_msec): - return (ts_msec - self.clock.time_msec()) / 1000 + secs = (ts_msec - self.clock.time_msec()) / 1000 + return max(secs, 0) def get_room_throttle_ms(self, room_id): if room_id in self.throttle_params: diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 735c03c7eb..77c64722c7 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore): ) get_tags_for_user = TagsStore.__dict__["get_tags_for_user"] + get_tags_for_room = ( + DataStore.get_tags_for_room.__func__ + ) + get_account_data_for_room = ( + DataStore.get_account_data_for_room.__func__ + ) get_updated_tags = DataStore.get_updated_tags.__func__ get_updated_account_data_for_user = ( diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d72ff6055c..622b2d8540 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_unread_counts_by_receipt_txn = ( + DataStore._get_unread_counts_by_receipt_txn.__func__ + ) + _get_unread_counts_by_pos_txn = ( + DataStore._get_unread_counts_by_pos_txn.__func__ + ) _get_state_group_for_events = ( StateStore.__dict__["_get_state_group_for_events"] ) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 703f4a49bf..40f6c9a386 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.storage import DataStore +from synapse.storage.presence import PresenceStore class SlavedPresenceStore(BaseSlavedStore): @@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore): _get_active_presence = DataStore._get_active_presence.__func__ take_presence_startup_info = DataStore.take_presence_startup_info.__func__ - get_presence_for_users = DataStore.get_presence_for_users.__func__ + _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] + get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] def get_current_presence_token(self): return self._presence_id_gen.get_current_token() diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index efa77b8c51..fceca2edeb 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -87,9 +87,17 @@ class HttpTransactionCache(object): deferred = fn(*args, **kwargs) - # We don't add an errback to the raw deferred, so we ask ObservableDeferred - # to swallow the error. This is fine as the error will still be reported - # to the observers. + # if the request fails with a Twisted failure, remove it + # from the transaction map. This is done to ensure that we don't + # cache transient errors like rate-limiting errors, etc. + def remove_from_map(err): + self.transactions.pop(txn_key, None) + return err + deferred.addErrback(remove_from_map) + + # We don't add any other errbacks to the raw deferred, so we ask + # ObservableDeferred to swallow the error. This is fine as the error will + # still be reported to the observers. observable = ObservableDeferred(deferred, consumeErrors=True) self.transactions[txn_key] = (observable, self.clock.time_msec()) return observable.observe() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2ebf5e59a0..90242a6bac 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -505,7 +505,6 @@ class RoomEventContext(ClientV1RestServlet): room_id, event_id, limit, - requester.is_guest, ) if not results: @@ -609,6 +608,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet): raise SynapseError(400, "Missing user_id key.") target = UserID.from_string(content["user_id"]) + event_content = None + if 'reason' in content and membership_action in ['kick', 'ban']: + event_content = {'reason': content['reason']} + yield self.handlers.room_member_handler.update_membership( requester=requester, target=target, @@ -616,6 +619,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): action=membership_action, txn_id=txn_id, third_party_signed=content.get("third_party_signed", None), + content=event_content, ) defer.returnValue((200, {})) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 3cbeca503c..481ffee200 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -240,6 +240,9 @@ class MediaRepository(object): if t_method == "crop": t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) elif t_method == "scale": + t_width, t_height = thumbnailer.aspect(t_width, t_height) + t_width = min(m_width, t_width) + t_height = min(m_height, t_height) t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) else: t_len = None diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b0dc391190..557701d0c4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -80,7 +80,13 @@ class LoggingTransaction(object): def executemany(self, sql, *args): self._do_execute(self.txn.executemany, sql, *args) + def _make_sql_one_line(self, sql): + "Strip newlines out of SQL so that the loggers in the DB are on one line" + return " ".join(l.strip() for l in sql.splitlines() if l.strip()) + def _do_execute(self, func, sql, *args): + sql = self._make_sql_one_line(sql) + # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8e17800364..d22db0a0b9 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -19,6 +19,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks + logger = logging.getLogger(__name__) @@ -144,6 +146,7 @@ class DeviceStore(SQLBaseStore): defer.returnValue({d["device_id"]: d for d in devices}) + @cached(max_entries=10000) def get_device_list_last_stream_id_for_remote(self, user_id): """Get the last stream_id we got for a user. May be None if we haven't got any information for them. @@ -156,16 +159,36 @@ class DeviceStore(SQLBaseStore): allow_none=True, ) + @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", + list_name="user_ids", inlineCallbacks=True) + def get_device_list_last_stream_id_for_remotes(self, user_ids): + rows = yield self._simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id",), + desc="get_user_devices_from_cache", + ) + + results = {user_id: None for user_id in user_ids} + results.update({ + row["user_id"]: row["stream_id"] for row in rows + }) + + defer.returnValue(results) + + @defer.inlineCallbacks def mark_remote_user_device_list_as_unsubscribed(self, user_id): """Mark that we no longer track device lists for remote user. """ - return self._simple_delete( + yield self._simple_delete( table="device_lists_remote_extremeties", keyvalues={ "user_id": user_id, }, desc="mark_remote_user_device_list_as_unsubscribed", ) + self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): @@ -191,6 +214,12 @@ class DeviceStore(SQLBaseStore): } ) + txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) + txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) + txn.call_after( + self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) + ) + self._simple_upsert_txn( txn, table="device_lists_remote_extremeties", @@ -234,6 +263,12 @@ class DeviceStore(SQLBaseStore): ] ) + txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) + txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,)) + txn.call_after( + self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) + ) + self._simple_upsert_txn( txn, table="device_lists_remote_extremeties", @@ -320,6 +355,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, results) + @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -332,27 +368,11 @@ class DeviceStore(SQLBaseStore): a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info """ - return self.runInteraction( - "get_user_devices_from_cache", self._get_user_devices_from_cache_txn, - query_list, + user_ids = set(user_id for user_id, _ in query_list) + user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + user_ids_in_cache = set( + user_id for user_id, stream_id in user_map.items() if stream_id ) - - def _get_user_devices_from_cache_txn(self, txn, query_list): - user_ids = {user_id for user_id, _ in query_list} - - user_ids_in_cache = set() - for user_id in user_ids: - stream_ids = self._simple_select_onecol_txn( - txn, - table="device_lists_remote_extremeties", - keyvalues={ - "user_id": user_id, - }, - retcol="stream_id", - ) - if stream_ids: - user_ids_in_cache.add(user_id) - user_ids_not_in_cache = user_ids - user_ids_in_cache results = {} @@ -361,32 +381,40 @@ class DeviceStore(SQLBaseStore): continue if device_id: - content = self._simple_select_one_onecol_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="content", - ) - results.setdefault(user_id, {})[device_id] = json.loads(content) + device = yield self._get_cached_user_device(user_id, device_id) + results.setdefault(user_id, {})[device_id] = device else: - devices = self._simple_select_list_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - }, - retcols=("device_id", "content"), - ) - results[user_id] = { - device["device_id"]: json.loads(device["content"]) - for device in devices - } - user_ids_in_cache.discard(user_id) + results[user_id] = yield self._get_cached_devices_for_user(user_id) - return user_ids_not_in_cache, results + defer.returnValue((user_ids_not_in_cache, results)) + + @cachedInlineCallbacks(num_args=2, tree=True) + def _get_cached_user_device(self, user_id, device_id): + content = yield self._simple_select_one_onecol( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + desc="_get_cached_user_device", + ) + defer.returnValue(json.loads(content)) + + @cachedInlineCallbacks() + def _get_cached_devices_for_user(self, user_id): + devices = yield self._simple_select_list( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + desc="_get_cached_devices_for_user", + ) + defer.returnValue({ + device["device_id"]: json.loads(device["content"]) + for device in devices + }) def get_devices_with_keys_by_user(self, user_id): """Get all devices (with any device keys) for a user diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index ee88c61954..256e50dc20 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore): ) def get_forward_extremeties_for_room(self, room_id, stream_ordering): + """For a given room_id and stream_ordering, return the forward + extremeties of the room at that point in "time". + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + + Args: + room_id (str): + stream_ordering (int): + + Returns: + deferred, which resolves to a list of event_ids + """ # We want to make the cache more effective, so we clamp to the last # change before the given ordering. last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # We don't always have a full stream_to_exterm_id table, e.g. after # the upgrade that introduced it, so we make sure we never ask for a - # try and pin to a stream_ordering from before a restart + # stream_ordering from before a restart last_change = max(self._stream_order_on_start, last_change) + # provided the last_change is recent enough, we now clamp the requested + # stream_ordering to it. if last_change > self.stream_ordering_month_ago: stream_ordering = min(last_change, stream_ordering) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 7de3e8c58c..14543b4269 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.types import RoomStreamToken from .stream import lower_bound @@ -25,11 +26,46 @@ import ujson as json logger = logging.getLogger(__name__) +DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}] +DEFAULT_HIGHLIGHT_ACTION = [ + "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"} +] + + +def _serialize_action(actions, is_highlight): + """Custom serializer for actions. This allows us to "compress" common actions. + + We use the fact that most users have the same actions for notifs (and for + highlights). + We store these default actions as the empty string rather than the full JSON. + Since the empty string isn't valid JSON there is no risk of this clashing with + any real JSON actions + """ + if is_highlight: + if actions == DEFAULT_HIGHLIGHT_ACTION: + return "" # We use empty string as the column is non-NULL + else: + if actions == DEFAULT_NOTIF_ACTION: + return "" + return json.dumps(actions) + + +def _deserialize_action(actions, is_highlight): + """Custom deserializer for actions. This allows us to "compress" common actions + """ + if actions: + return json.loads(actions) + + if is_highlight: + return DEFAULT_HIGHLIGHT_ACTION + else: + return DEFAULT_NOTIF_ACTION + + class EventPushActionsStore(SQLBaseStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" def __init__(self, hs): - self.stream_ordering_month_ago = None super(EventPushActionsStore, self).__init__(hs) self.register_background_index_update( @@ -47,6 +83,11 @@ class EventPushActionsStore(SQLBaseStore): where_clause="highlight=1" ) + self._doing_notif_rotation = False + self._rotate_notif_loop = self._clock.looping_call( + self._rotate_notifs, 30 * 60 * 1000 + ) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -55,15 +96,17 @@ class EventPushActionsStore(SQLBaseStore): """ values = [] for uid, actions in tuples: + is_highlight = 1 if _action_has_highlight(actions) else 0 + values.append({ 'room_id': event.room_id, 'event_id': event.event_id, 'user_id': uid, - 'actions': json.dumps(actions), + 'actions': _serialize_action(actions, is_highlight), 'stream_ordering': event.internal_metadata.stream_ordering, 'topological_ordering': event.depth, 'notif': 1, - 'highlight': 1 if _action_has_highlight(actions) else 0, + 'highlight': is_highlight, }) for uid, __ in tuples: @@ -77,66 +120,83 @@ class EventPushActionsStore(SQLBaseStore): def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): - def _get_unread_event_push_actions_by_room(txn): - sql = ( - "SELECT stream_ordering, topological_ordering" - " FROM events" - " WHERE room_id = ? AND event_id = ?" - ) - txn.execute( - sql, (room_id, last_read_event_id) - ) - results = txn.fetchall() - if len(results) == 0: - return {"notify_count": 0, "highlight_count": 0} - - stream_ordering = results[0][0] - topological_ordering = results[0][1] - token = RoomStreamToken( - topological_ordering, stream_ordering - ) - - # First get number of notifications. - # We don't need to put a notif=1 clause as all rows always have - # notif=1 - sql = ( - "SELECT count(*)" - " FROM event_push_actions ea" - " WHERE" - " user_id = ?" - " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) - - txn.execute(sql, (user_id, room_id)) - row = txn.fetchone() - notify_count = row[0] if row else 0 + ret = yield self.runInteraction( + "get_unread_event_push_actions_by_room", + self._get_unread_counts_by_receipt_txn, + room_id, user_id, last_read_event_id + ) + defer.returnValue(ret) - # Now get the number of highlights - sql = ( - "SELECT count(*)" - " FROM event_push_actions ea" - " WHERE" - " highlight = 1" - " AND user_id = ?" - " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, + last_read_event_id): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return {"notify_count": 0, "highlight_count": 0} - txn.execute(sql, (user_id, room_id)) - row = txn.fetchone() - highlight_count = row[0] if row else 0 + stream_ordering = results[0][0] + topological_ordering = results[0][1] - return { - "notify_count": notify_count, - "highlight_count": highlight_count, - } + return self._get_unread_counts_by_pos_txn( + txn, room_id, user_id, topological_ordering, stream_ordering + ) - ret = yield self.runInteraction( - "get_unread_event_push_actions_by_room", - _get_unread_event_push_actions_by_room + def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, + stream_ordering): + token = RoomStreamToken( + topological_ordering, stream_ordering ) - defer.returnValue(ret) + + # First get number of notifications. + # We don't need to put a notif=1 clause as all rows always have + # notif=1 + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + notify_count = row[0] if row else 0 + + txn.execute(""" + SELECT notif_count FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering > ? + """, (room_id, user_id, stream_ordering,)) + rows = txn.fetchall() + if rows: + notify_count += rows[0][0] + + # Now get the number of highlights + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " highlight = 1" + " AND user_id = ?" + " AND room_id = ?" + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + highlight_count = row[0] if row else 0 + + return { + "notify_count": notify_count, + "highlight_count": highlight_count, + } @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -176,7 +236,8 @@ class EventPushActionsStore(SQLBaseStore): # find rooms that have a read receipt in them and return the next # push actions sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " ep.highlight " " FROM (" " SELECT room_id," " MAX(topological_ordering) as topological_ordering," @@ -217,7 +278,7 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight " " FROM event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" @@ -246,7 +307,7 @@ class EventPushActionsStore(SQLBaseStore): "event_id": row[0], "room_id": row[1], "stream_ordering": row[2], - "actions": json.loads(row[3]), + "actions": _deserialize_action(row[3], row[4]), } for row in after_read_receipt + no_read_receipt ] @@ -285,7 +346,7 @@ class EventPushActionsStore(SQLBaseStore): def get_after_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight, e.received_ts" " FROM (" " SELECT room_id," " MAX(topological_ordering) as topological_ordering," @@ -327,7 +388,7 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight, e.received_ts" " FROM event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" @@ -357,8 +418,8 @@ class EventPushActionsStore(SQLBaseStore): "event_id": row[0], "room_id": row[1], "stream_ordering": row[2], - "actions": json.loads(row[3]), - "received_ts": row[4], + "actions": _deserialize_action(row[3], row[4]), + "received_ts": row[5], } for row in after_read_receipt + no_read_receipt ] @@ -392,7 +453,7 @@ class EventPushActionsStore(SQLBaseStore): sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," - " epa.actions, epa.profile_tag, e.received_ts" + " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" " WHERE epa.event_id = e.event_id" " AND epa.user_id = ? %s" @@ -407,7 +468,7 @@ class EventPushActionsStore(SQLBaseStore): "get_push_actions_for_user", f ) for pa in push_actions: - pa["actions"] = json.loads(pa["actions"]) + pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) defer.returnValue(push_actions) @defer.inlineCallbacks @@ -448,10 +509,14 @@ class EventPushActionsStore(SQLBaseStore): ) def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + topological_ordering, stream_ordering): """ - Purges old, stale push actions for a user and room before a given - topological_ordering + Purges old push actions for a user and room before a given + topological_ordering. + + We however keep a months worth of highlighted notifications, so that + users can still get a list of recent highlights. + Args: txn: The transcation room_id: Room ID to delete from @@ -475,10 +540,16 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ? AND stream_ordering < ?", + " topological_ordering <= ?" + " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) + txn.execute(""" + DELETE FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? + """, (room_id, user_id, stream_ordering)) + @defer.inlineCallbacks def _find_stream_orderings_for_times(self): yield self.runInteraction( @@ -495,6 +566,14 @@ class EventPushActionsStore(SQLBaseStore): "Found stream ordering 1 month ago: it's %d", self.stream_ordering_month_ago ) + logger.info("Searching for stream ordering 1 day ago") + self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 day ago: it's %d", + self.stream_ordering_day_ago + ) def _find_first_stream_ordering_after_ts_txn(self, txn, ts): """ @@ -534,6 +613,131 @@ class EventPushActionsStore(SQLBaseStore): return range_end + @defer.inlineCallbacks + def _rotate_notifs(self): + if self._doing_notif_rotation or self.stream_ordering_day_ago is None: + return + self._doing_notif_rotation = True + + try: + while True: + logger.info("Rotating notifications") + + caught_up = yield self.runInteraction( + "_rotate_notifs", + self._rotate_notifs_txn + ) + if caught_up: + break + yield sleep(5) + finally: + self._doing_notif_rotation = False + + def _rotate_notifs_txn(self, txn): + """Archives older notifications into event_push_summary. Returns whether + the archiving process has caught up or not. + """ + + old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + # We don't to try and rotate millions of rows at once, so we cap the + # maximum stream ordering we'll rotate before. + txn.execute(""" + SELECT stream_ordering FROM event_push_actions + WHERE stream_ordering > ? + ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 + """, (old_rotate_stream_ordering,)) + stream_row = txn.fetchone() + if stream_row: + offset_stream_ordering, = stream_row + rotate_to_stream_ordering = min( + self.stream_ordering_day_ago, offset_stream_ordering + ) + caught_up = offset_stream_ordering >= self.stream_ordering_day_ago + else: + rotate_to_stream_ordering = self.stream_ordering_day_ago + caught_up = True + + logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) + + self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) + + # We have caught up iff we were limited by `stream_ordering_day_ago` + return caught_up + + def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): + old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + # Calculate the new counts that should be upserted into event_push_summary + sql = """ + SELECT user_id, room_id, + coalesce(old.notif_count, 0) + upd.notif_count, + upd.stream_ordering, + old.user_id + FROM ( + SELECT user_id, room_id, count(*) as notif_count, + max(stream_ordering) as stream_ordering + FROM event_push_actions + WHERE ? <= stream_ordering AND stream_ordering < ? + AND highlight = 0 + GROUP BY user_id, room_id + ) AS upd + LEFT JOIN event_push_summary AS old USING (user_id, room_id) + """ + + txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,)) + rows = txn.fetchall() + + logger.info("Rotating notifications, handling %d rows", len(rows)) + + # If the `old.user_id` above is NULL then we know there isn't already an + # entry in the table, so we simply insert it. Otherwise we update the + # existing table. + self._simple_insert_many_txn( + txn, + table="event_push_summary", + values=[ + { + "user_id": row[0], + "room_id": row[1], + "notif_count": row[2], + "stream_ordering": row[3], + } + for row in rows if row[4] is None + ] + ) + + txn.executemany( + """ + UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? + WHERE user_id = ? AND room_id = ? + """, + ((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None) + ) + + txn.execute( + "DELETE FROM event_push_actions" + " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0", + (old_rotate_stream_ordering, rotate_to_stream_ordering,) + ) + + logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) + + txn.execute( + "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", + (rotate_to_stream_ordering,) + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 7460f98a1f..4d1590d2b4 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -15,7 +15,7 @@ from ._base import SQLBaseStore from synapse.api.constants import PresenceState -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from collections import namedtuple from twisted.internet import defer @@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore): self.presence_stream_cache.entity_has_changed, state.user_id, stream_id, ) + self._invalidate_cache_and_stream( + txn, self._get_presence_for_user, (state.user_id,) + ) # Actually insert new rows self._simple_insert_many_txn( @@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore): "get_all_presence_updates", get_all_presence_updates_txn ) - @defer.inlineCallbacks + @cached() + def _get_presence_for_user(self, user_id): + raise NotImplementedError() + + @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids", + num_args=1, inlineCallbacks=True) def get_presence_for_users(self, user_ids): rows = yield self._simple_select_many_batch( table="presence_stream", @@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore): for row in rows: row["currently_active"] = bool(row["currently_active"]) - defer.returnValue([UserPresenceState(**row) for row in rows]) + defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows}) def get_current_presence_token(self): return self._presence_id_gen.get_current_token() diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index f72d15f5ed..5cf41501ea 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore): room_id=room_id, user_id=user_id, topological_ordering=topological_ordering, + stream_ordering=stream_ordering, ) return True diff --git a/synapse/storage/schema/delta/40/event_push_summary.sql b/synapse/storage/schema/delta/40/event_push_summary.sql new file mode 100644 index 0000000000..3918f0b794 --- /dev/null +++ b/synapse/storage/schema/delta/40/event_push_summary.sql @@ -0,0 +1,37 @@ +/* Copyright 2017 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Aggregate of old notification counts that have been deleted out of the +-- main event_push_actions table. This count does not include those that were +-- highlights, as they remain in the event_push_actions table. +CREATE TABLE event_push_summary ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notif_count BIGINT NOT NULL, + stream_ordering BIGINT NOT NULL +); + +CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id); + + +-- The stream ordering up to which we have aggregated the event_push_actions +-- table into event_push_summary +CREATE TABLE event_push_summary_stream_ordering ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); diff --git a/synapse/storage/schema/delta/40/pushers.sql b/synapse/storage/schema/delta/40/pushers.sql new file mode 100644 index 0000000000..054a223f14 --- /dev/null +++ b/synapse/storage/schema/delta/40/pushers.sql @@ -0,0 +1,39 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag TEXT NOT NULL, + kind TEXT NOT NULL, + app_id TEXT NOT NULL, + app_display_name TEXT NOT NULL, + device_display_name TEXT NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang TEXT, + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) +); + +INSERT INTO pushers2 SELECT * FROM PUSHERS; + +DROP TABLE PUSHERS; + +ALTER TABLE pushers2 RENAME TO pushers; diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1b3800eb6a..84482d8285 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -413,7 +413,19 @@ class StateStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types): + def get_state_ids_for_events(self, event_ids, types=None): + """ + Get the state dicts corresponding to a list of events + + Args: + event_ids(list(str)): events whose state should be returned + types(list[(str, str)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. May be None, which + matches any key + + Returns: + A deferred dict from event_id -> (type, state_key) -> state_event + """ event_to_groups = yield self._get_state_group_for_events( event_ids, ) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index e9044afa2e..3135488353 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -17,9 +17,15 @@ from twisted.internet import defer import tests.unittest import tests.utils +from mock import Mock USER_ID = "@user:example.com" +PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}] +HIGHLIGHT = [ + "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"} +] + class EventPushActionsStoreTestCase(tests.unittest.TestCase): @@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): yield self.store.get_unread_push_actions_for_user_in_range_for_email( USER_ID, 0, 1000, 20 ) + + @defer.inlineCallbacks + def test_count_aggregation(self): + room_id = "!foo:example.com" + user_id = "@user1235:example.com" + + @defer.inlineCallbacks + def _assert_counts(noitf_count, highlight_count): + counts = yield self.store.runInteraction( + "", self.store._get_unread_counts_by_pos_txn, + room_id, user_id, 0, 0 + ) + self.assertEquals( + counts, + {"notify_count": noitf_count, "highlight_count": highlight_count} + ) + + def _inject_actions(stream, action): + event = Mock() + event.room_id = room_id + event.event_id = "$test:example.com" + event.internal_metadata.stream_ordering = stream + event.depth = stream + + tuples = [(user_id, action)] + + return self.store.runInteraction( + "", self.store._set_push_actions_for_event_and_users_txn, + event, tuples + ) + + def _rotate(stream): + return self.store.runInteraction( + "", self.store._rotate_notifs_before_txn, stream + ) + + def _mark_read(stream, depth): + return self.store.runInteraction( + "", self.store._remove_old_push_actions_before_txn, + room_id, user_id, depth, stream + ) + + yield _assert_counts(0, 0) + yield _inject_actions(1, PlAIN_NOTIF) + yield _assert_counts(1, 0) + yield _rotate(2) + yield _assert_counts(1, 0) + + yield _inject_actions(3, PlAIN_NOTIF) + yield _assert_counts(2, 0) + yield _rotate(4) + yield _assert_counts(2, 0) + + yield _inject_actions(5, PlAIN_NOTIF) + yield _mark_read(3, 3) + yield _assert_counts(1, 0) + + yield _mark_read(5, 5) + yield _assert_counts(0, 0) + + yield _inject_actions(6, PlAIN_NOTIF) + yield _rotate(7) + + yield self.store._simple_delete( + table="event_push_actions", + keyvalues={"1": 1}, + desc="", + ) + + yield _assert_counts(1, 0) + + yield _mark_read(7, 7) + yield _assert_counts(0, 0) + + yield _inject_actions(8, HIGHLIGHT) + yield _assert_counts(1, 1) + yield _rotate(9) + yield _assert_counts(1, 1) + yield _rotate(10) + yield _assert_counts(1, 1) |