From 3dec9c66b3c2af1bf5b7283d7db443b65ebbd8a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Feb 2018 12:01:08 +0000 Subject: Split out RoomMemberStore --- synapse/storage/roommember.py | 360 +++++++++++++++++++++--------------------- 1 file changed, 181 insertions(+), 179 deletions(-) (limited to 'synapse/storage/roommember.py') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3e77fd3901..6574fe74b4 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -17,7 +17,7 @@ from twisted.internet import defer from collections import namedtuple -from ._base import SQLBaseStore +from synapse.storage.events import EventsWorkerStore from synapse.util.async import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks @@ -48,97 +48,7 @@ ProfileInfo = namedtuple( _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" -class RoomMemberStore(SQLBaseStore): - def __init__(self, db_conn, hs): - super(RoomMemberStore, self).__init__(db_conn, hs) - self.register_background_update_handler( - _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile - ) - - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database. - """ - self._simple_insert_many_txn( - txn, - table="room_memberships", - values=[ - { - "event_id": event.event_id, - "user_id": event.state_key, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - "display_name": event.content.get("displayname", None), - "avatar_url": event.content.get("avatar_url", None), - } - for event in events - ] - ) - - for event in events: - txn.call_after( - self._membership_stream_cache.entity_has_changed, - event.state_key, event.internal_metadata.stream_ordering - ) - txn.call_after( - self.get_invited_rooms_for_user.invalidate, (event.state_key,) - ) - - # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. - # The only current event that can also be an outlier is if its an - # invite that has come in across federation. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_invite_from_remote() - ) - is_mine = self.hs.is_mine_id(event.state_key) - if is_new_state and is_mine: - if event.membership == Membership.INVITE: - self._simple_insert_txn( - txn, - table="local_invites", - values={ - "event_id": event.event_id, - "invitee": event.state_key, - "inviter": event.sender, - "room_id": event.room_id, - "stream_id": event.internal_metadata.stream_ordering, - } - ) - else: - sql = ( - "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - txn.execute(sql, ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - )) - - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, ( - stream_ordering, - True, - room_id, - user_id, - )) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.runInteraction("locally_reject_invite", f, stream_ordering) - +class RoomMemberWorkerStore(EventsWorkerStore): @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True) def get_hosts_in_room(self, room_id, cache_context): """Returns the set of all hosts currently in the room @@ -295,89 +205,6 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(user_who_share_room) - def forget(self, user_id, room_id): - """Indicate that user_id wishes to discard history for room_id.""" - def f(txn): - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - txn.call_after(self.was_forgotten_at.invalidate_all) - txn.call_after(self.did_forget.invalidate, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.who_forgot_in_room, (room_id,) - ) - return self.runInteraction("forget_membership", f) - - @cachedInlineCallbacks(num_args=2) - def did_forget(self, user_id, room_id): - """Returns whether user_id has elected to discard history for room_id. - - Returns False if they have since re-joined.""" - def f(txn): - sql = ( - "SELECT" - " COUNT(*)" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " forgotten = 0" - ) - txn.execute(sql, (user_id, room_id)) - rows = txn.fetchall() - return rows[0][0] - count = yield self.runInteraction("did_forget_membership", f) - defer.returnValue(count == 0) - - @cachedInlineCallbacks(num_args=3) - def was_forgotten_at(self, user_id, room_id, event_id): - """Returns whether user_id has elected to discard history for room_id at - event_id. - - event_id must be a membership event.""" - def f(txn): - sql = ( - "SELECT" - " forgotten" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " event_id = ?" - ) - txn.execute(sql, (user_id, room_id, event_id)) - rows = txn.fetchall() - return rows[0][0] - forgot = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(forgot == 1) - - @cached() - def who_forgot_in_room(self, room_id): - return self._simple_select_list( - table="room_memberships", - retcols=("user_id", "event_id"), - keyvalues={ - "room_id": room_id, - "forgotten": 1, - }, - desc="who_forgot" - ) - def get_joined_users_from_context(self, event, context): state_group = context.state_group if not state_group: @@ -600,6 +427,185 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(joined_hosts) + @cached(max_entries=10000, iterable=True) + def _get_joined_hosts_cache(self, room_id): + return _JoinedHostsCache(self, room_id) + + +class RoomMemberStore(RoomMemberWorkerStore): + def __init__(self, db_conn, hs): + super(RoomMemberStore, self).__init__(db_conn, hs) + self.register_background_update_handler( + _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile + ) + + def _store_room_members_txn(self, txn, events, backfilled): + """Store a room member in the database. + """ + self._simple_insert_many_txn( + txn, + table="room_memberships", + values=[ + { + "event_id": event.event_id, + "user_id": event.state_key, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), + } + for event in events + ] + ) + + for event in events: + txn.call_after( + self._membership_stream_cache.entity_has_changed, + event.state_key, event.internal_metadata.stream_ordering + ) + txn.call_after( + self.get_invited_rooms_for_user.invalidate, (event.state_key,) + ) + + # We update the local_invites table only if the event is "current", + # i.e., its something that has just happened. + # The only current event that can also be an outlier is if its an + # invite that has come in across federation. + is_new_state = not backfilled and ( + not event.internal_metadata.is_outlier() + or event.internal_metadata.is_invite_from_remote() + ) + is_mine = self.hs.is_mine_id(event.state_key) + if is_new_state and is_mine: + if event.membership == Membership.INVITE: + self._simple_insert_txn( + txn, + table="local_invites", + values={ + "event_id": event.event_id, + "invitee": event.state_key, + "inviter": event.sender, + "room_id": event.room_id, + "stream_id": event.internal_metadata.stream_ordering, + } + ) + else: + sql = ( + "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + txn.execute(sql, ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + )) + + @defer.inlineCallbacks + def locally_reject_invite(self, user_id, room_id): + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, ( + stream_ordering, + True, + room_id, + user_id, + )) + + with self._stream_id_gen.get_next() as stream_ordering: + yield self.runInteraction("locally_reject_invite", f, stream_ordering) + + def forget(self, user_id, room_id): + """Indicate that user_id wishes to discard history for room_id.""" + def f(txn): + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + + txn.call_after(self.was_forgotten_at.invalidate_all) + txn.call_after(self.did_forget.invalidate, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.who_forgot_in_room, (room_id,) + ) + return self.runInteraction("forget_membership", f) + + @cachedInlineCallbacks(num_args=2) + def did_forget(self, user_id, room_id): + """Returns whether user_id has elected to discard history for room_id. + + Returns False if they have since re-joined.""" + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + " FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " forgotten = 0" + ) + txn.execute(sql, (user_id, room_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) + defer.returnValue(count == 0) + + @cachedInlineCallbacks(num_args=3) + def was_forgotten_at(self, user_id, room_id, event_id): + """Returns whether user_id has elected to discard history for room_id at + event_id. + + event_id must be a membership event.""" + def f(txn): + sql = ( + "SELECT" + " forgotten" + " FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " event_id = ?" + ) + txn.execute(sql, (user_id, room_id, event_id)) + rows = txn.fetchall() + return rows[0][0] + forgot = yield self.runInteraction("did_forget_membership_at", f) + defer.returnValue(forgot == 1) + + @cached() + def who_forgot_in_room(self, room_id): + return self._simple_select_list( + table="room_memberships", + retcols=("user_id", "event_id"), + keyvalues={ + "room_id": room_id, + "forgotten": 1, + }, + desc="who_forgot" + ) + @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( @@ -675,10 +681,6 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(result) - @cached(max_entries=10000, iterable=True) - def _get_joined_hosts_cache(self, room_id): - return _JoinedHostsCache(self, room_id) - class _JoinedHostsCache(object): """Cache for joined hosts in a room that is optimised to handle updates -- cgit 1.5.1 From 70349872c2dcff20a7b174bf0fcfdd5b8e47eec3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Feb 2018 11:14:35 +0000 Subject: Update copyright --- synapse/replication/slave/storage/events.py | 1 + synapse/storage/events.py | 1 + synapse/storage/roommember.py | 1 + 3 files changed, 3 insertions(+) (limited to 'synapse/storage/roommember.py') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 0dc87aee7f..ef7a42d801 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 681a33314d..3e3229b408 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6574fe74b4..b9158b9896 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.5.1 From 493e25d5545389264f696be0e07544bf82a0818a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Feb 2018 12:01:36 +0000 Subject: Move storage functions for push calculations This will allow push actions for an event to be calculated on workers. --- synapse/app/pusher.py | 5 -- synapse/app/synchrotron.py | 8 +-- synapse/storage/event_push_actions.py | 126 +++++++++++++++++----------------- synapse/storage/push_rule.py | 14 +++- synapse/storage/pusher.py | 22 +++--- synapse/storage/roommember.py | 24 +++---- 6 files changed, 101 insertions(+), 98 deletions(-) (limited to 'synapse/storage/roommember.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 32ccea3f13..98a4a7c62c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -32,7 +32,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine -from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole @@ -75,10 +74,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - who_forgot_in_room = ( - RoomMemberStore.__dict__["who_forgot_in_room"] - ) - class PusherServer(HomeServer): def setup(self): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f87531f1b6..abe91dcfbd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -62,8 +62,6 @@ logger = logging.getLogger("synapse.app.synchrotron") class SynchrotronSlavedStore( - SlavedPushRuleStore, - SlavedEventStore, SlavedReceiptsStore, SlavedAccountDataStore, SlavedApplicationServiceStore, @@ -73,14 +71,12 @@ class SynchrotronSlavedStore( SlavedGroupServerStore, SlavedDeviceInboxStore, SlavedDeviceStore, + SlavedPushRuleStore, + SlavedEventStore, SlavedClientIpStore, RoomStore, BaseSlavedStore, ): - who_forgot_in_room = ( - RoomMemberStore.__dict__["who_forgot_in_room"] - ) - did_forget = ( RoomMemberStore.__dict__["did_forget"] ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index fe6887414e..6454045c2d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -380,6 +380,69 @@ class EventPushActionsWorkerStore(SQLBaseStore): # Now return the first `limit` defer.returnValue(notifs[:limit]) + def add_push_actions_to_staging(self, event_id, user_id_actions): + """Add the push actions for the event to the push action staging area. + + Args: + event_id (str) + user_id_actions (dict[str, list[dict|str])]): A dictionary mapping + user_id to list of push actions, where an action can either be + a string or dict. + + Returns: + Deferred + """ + + if not user_id_actions: + return + + # This is a helper function for generating the necessary tuple that + # can be used to inert into the `event_push_actions_staging` table. + def _gen_entry(user_id, actions): + is_highlight = 1 if _action_has_highlight(actions) else 0 + return ( + event_id, # event_id column + user_id, # user_id column + _serialize_action(actions, is_highlight), # actions column + 1, # notif column + is_highlight, # highlight column + ) + + def _add_push_actions_to_staging_txn(txn): + # We don't use _simple_insert_many here to avoid the overhead + # of generating lists of dicts. + + sql = """ + INSERT INTO event_push_actions_staging + (event_id, user_id, actions, notif, highlight) + VALUES (?, ?, ?, ?, ?) + """ + + txn.executemany(sql, ( + _gen_entry(user_id, actions) + for user_id, actions in user_id_actions.iteritems() + )) + + return self.runInteraction( + "add_push_actions_to_staging", _add_push_actions_to_staging_txn + ) + + def remove_push_actions_from_staging(self, event_id): + """Called if we failed to persist the event to ensure that stale push + actions don't build up in the DB + + Args: + event_id (str) + """ + + return self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" @@ -775,69 +838,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore): (rotate_to_stream_ordering,) ) - def add_push_actions_to_staging(self, event_id, user_id_actions): - """Add the push actions for the event to the push action staging area. - - Args: - event_id (str) - user_id_actions (dict[str, list[dict|str])]): A dictionary mapping - user_id to list of push actions, where an action can either be - a string or dict. - - Returns: - Deferred - """ - - if not user_id_actions: - return - - # This is a helper function for generating the necessary tuple that - # can be used to inert into the `event_push_actions_staging` table. - def _gen_entry(user_id, actions): - is_highlight = 1 if _action_has_highlight(actions) else 0 - return ( - event_id, # event_id column - user_id, # user_id column - _serialize_action(actions, is_highlight), # actions column - 1, # notif column - is_highlight, # highlight column - ) - - def _add_push_actions_to_staging_txn(txn): - # We don't use _simple_insert_many here to avoid the overhead - # of generating lists of dicts. - - sql = """ - INSERT INTO event_push_actions_staging - (event_id, user_id, actions, notif, highlight) - VALUES (?, ?, ?, ?, ?) - """ - - txn.executemany(sql, ( - _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() - )) - - return self.runInteraction( - "add_push_actions_to_staging", _add_push_actions_to_staging_txn - ) - - def remove_push_actions_from_staging(self, event_id): - """Called if we failed to persist the event to ensure that stale push - actions don't build up in the DB - - Args: - event_id (str) - """ - - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) - def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 583efb7bdf..04a0b59a39 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -15,6 +15,10 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.storage.appservice import ApplicationServiceWorkerStore +from synapse.storage.pusher import PusherWorkerStore +from synapse.storage.receipts import ReceiptsWorkerStore +from synapse.storage.roommember import RoomMemberWorkerStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.push.baserules import list_with_base_rules @@ -51,7 +55,11 @@ def _load_rules(rawrules, enabled_map): return rules -class PushRulesWorkerStore(SQLBaseStore): +class PushRulesWorkerStore(ApplicationServiceWorkerStore, + ReceiptsWorkerStore, + PusherWorkerStore, + RoomMemberWorkerStore, + SQLBaseStore): """This is an abstract base class where subclasses must implement `get_max_push_rules_stream_id` which can be called in the initializer. """ @@ -140,8 +148,6 @@ class PushRulesWorkerStore(SQLBaseStore): "have_push_rules_changed", have_push_rules_changed_txn ) - -class PushRuleStore(PushRulesWorkerStore): @cachedList(cached_method_name="get_push_rules_for_user", list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules(self, user_ids): @@ -281,6 +287,8 @@ class PushRuleStore(PushRulesWorkerStore): results.setdefault(row['user_name'], {})[row['rule_id']] = enabled defer.returnValue(results) + +class PushRuleStore(PushRulesWorkerStore): @defer.inlineCallbacks def add_push_rule( self, user_id, rule_id, priority_class, conditions, actions, diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index f4af3e4caa..307660b99a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -175,11 +175,6 @@ class PusherWorkerStore(SQLBaseStore): "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) - -class PusherStore(PusherWorkerStore): - def get_pushers_stream_token(self): - return self._pushers_id_gen.get_current_token() - @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator @@ -201,6 +196,11 @@ class PusherStore(PusherWorkerStore): defer.returnValue(result) + +class PusherStore(PusherWorkerStore): + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_current_token() + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, @@ -233,14 +233,18 @@ class PusherStore(PusherWorkerStore): ) if newly_inserted: - # get_if_user_has_pusher only cares if the user has - # at least *one* pusher. - self.get_if_user_has_pusher.invalidate(user_id,) + self.runInteraction( + "add_pusher", + self._invalidate_cache_and_stream, + self.get_if_user_has_pusher, (user_id,) + ) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): - txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) + self._invalidate_cache_and_stream( + txn, self.get_if_user_has_pusher, (user_id,) + ) self._simple_delete_one_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index b9158b9896..d79877dac7 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -432,6 +432,18 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) + @cached() + def who_forgot_in_room(self, room_id): + return self._simple_select_list( + table="room_memberships", + retcols=("user_id", "event_id"), + keyvalues={ + "room_id": room_id, + "forgotten": 1, + }, + desc="who_forgot" + ) + class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): @@ -595,18 +607,6 @@ class RoomMemberStore(RoomMemberWorkerStore): forgot = yield self.runInteraction("did_forget_membership_at", f) defer.returnValue(forgot == 1) - @cached() - def who_forgot_in_room(self, room_id): - return self._simple_select_list( - table="room_memberships", - retcols=("user_id", "event_id"), - keyvalues={ - "room_id": room_id, - "forgotten": 1, - }, - desc="who_forgot" - ) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( -- cgit 1.5.1 From 8cb44da4aa569188faa2a94aae6bc093aa8e22ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 12:06:19 +0000 Subject: Fix race in sync when joining room The race happens when the user joins a room at the same time as doing a sync. We fetch the current token and then get the rooms the user is in. If the join happens after the current token, but before we get the rooms we end up sending down a partial room entry in the sync. This is fixed by looking at the stream ordering of the membership returned by get_rooms_for_user, and handling the case when that stream ordering is after the current token. --- synapse/handlers/sync.py | 103 ++++++++++++++++++++++++++++++------------ synapse/storage/events.py | 2 +- synapse/storage/roommember.py | 27 ++++++++++- 3 files changed, 102 insertions(+), 30 deletions(-) (limited to 'synapse/storage/roommember.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 56b86356f2..163d80417e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -235,10 +235,10 @@ class SyncHandler(object): defer.returnValue(rules) @defer.inlineCallbacks - def ephemeral_by_room(self, sync_config, now_token, since_token=None): + def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): """Get the ephemeral events for each room the user is in Args: - sync_config (SyncConfig): The flags, filters and user for the sync. + sync_result_builder(SyncResultBuilder) now_token (StreamToken): Where the server is currently up to. since_token (StreamToken): Where the server was when the client last synced. @@ -248,10 +248,12 @@ class SyncHandler(object): typing events for that room. """ + sync_config = sync_result_builder.sync_config + with Measure(self.clock, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else "0" - room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = sync_result_builder.joined_room_ids typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( @@ -565,10 +567,22 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + else: + joined_room_ids = yield self.get_rooms_for_user_at( + user_id, now_token.room_stream_id, + ) + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, + joined_room_ids=joined_room_ids, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( @@ -603,7 +617,6 @@ class SyncHandler(object): device_id = sync_config.device_id one_time_key_counts = {} if device_id: - user_id = sync_config.user.to_string() one_time_key_counts = yield self.store.count_e2e_one_time_keys( user_id, device_id ) @@ -891,7 +904,7 @@ class SyncHandler(object): ephemeral_by_room = {} else: now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, + sync_result_builder, now_token=sync_result_builder.now_token, since_token=sync_result_builder.since_token, ) @@ -996,16 +1009,8 @@ class SyncHandler(object): if rooms_changed: defer.returnValue(True) - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): defer.returnValue(True) defer.returnValue(False) @@ -1029,14 +1034,6 @@ class SyncHandler(object): assert since_token - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key @@ -1059,7 +1056,7 @@ class SyncHandler(object): # we do send down the room, and with full state, where necessary old_state_ids = None - if room_id in joined_room_ids and non_joins: + if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially # important so that device list changes are calculated correctly. # If there are non join member events, but we are still in the room, @@ -1069,7 +1066,7 @@ class SyncHandler(object): # User is in the room so we don't need to do the invite/leave checks continue - if room_id in joined_room_ids or has_join: + if room_id in sync_result_builder.joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None @@ -1081,7 +1078,7 @@ class SyncHandler(object): newly_joined_rooms.append(room_id) # If user is in the room then we don't need to do the invite/leave checks - if room_id in joined_room_ids: + if room_id in sync_result_builder.joined_room_ids: continue if not non_joins: @@ -1148,7 +1145,7 @@ class SyncHandler(object): # Get all events for rooms we're currently joined to. room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, + room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, @@ -1156,7 +1153,7 @@ class SyncHandler(object): # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) if room_entry: @@ -1364,6 +1361,54 @@ class SyncHandler(object): else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) + @defer.inlineCallbacks + def get_rooms_for_user_at(self, user_id, stream_ordering): + """Get set of joined rooms for a user at the given stream ordering. + + The stream ordering *must* be recent, otherwise this may throw an + exception if older than a month. (This function is called with the + current token, which should be perfectly fine). + + Args: + user_id (str) + stream_ordering (int) + + ReturnValue: + Deferred[frozenset[str]]: Set of room_ids the user is in at given + stream_ordering. + """ + joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering( + user_id, + ) + + joined_room_ids = set() + + # We need to check that the stream ordering of the join for each room + # is before the stream_ordering asked for. This might not be the case + # if the user joins a room between us getting the current token and + # calling `get_rooms_for_user_with_stream_ordering`. + # If the membership's stream ordering is after the given stream + # ordering, we need to go and work out if the user was in the room + # before. + for room_id, membeship_stream_ordering in joined_rooms: + if membeship_stream_ordering <= stream_ordering: + joined_room_ids.add(room_id) + continue + + logger.info("SH joined_room_ids membership after current token") + + extrems = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering, + ) + users_in_room = yield self.state.get_current_user_in_room( + room_id, extrems, + ) + if user_id in users_in_room: + joined_room_ids.add(room_id) + + joined_room_ids = frozenset(joined_room_ids) + defer.returnValue(joined_room_ids) + def _action_has_highlight(actions): for action in actions: @@ -1413,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" - def __init__(self, sync_config, full_state, since_token, now_token): + def __init__(self, sync_config, full_state, since_token, now_token, + joined_room_ids): """ Args: sync_config(SyncConfig) @@ -1425,6 +1471,7 @@ class SyncResultBuilder(object): self.full_state = full_state self.since_token = since_token self.now_token = now_token + self.joined_room_ids = joined_room_ids self.presence = [] self.account_data = [] diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 057b1be4d5..826fad307e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -754,7 +754,7 @@ class EventsStore(EventsWorkerStore): for member in members_changed: self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user, (member,) + txn, self.get_rooms_for_user_with_stream_ordering, (member,) ) for host in set(get_domain_from_id(u) for u in members_changed): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d79877dac7..52e19e16b0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -38,6 +38,11 @@ RoomsForUser = namedtuple( ("room_id", "sender", "membership", "event_id", "stream_ordering") ) +GetRoomsForUserWithStreamOrdering = namedtuple( + "_GetRoomsForUserWithStreamOrdering", + ("room_id", "stream_ordering",) +) + # We store this using a namedtuple so that we save about 3x space over using a # dict. @@ -181,12 +186,32 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results @cachedInlineCallbacks(max_entries=500000, iterable=True) - def get_rooms_for_user(self, user_id): + def get_rooms_for_user_with_stream_ordering(self, user_id): """Returns a set of room_ids the user is currently joined to + + Args: + user_id (str) + + Returns: + Deferred[frozenset[GetRoomsForUserWithStreamOrdering]]: Returns + the rooms the user is in currently, along with the stream ordering + of the most recent join for that user and room. """ rooms = yield self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], ) + defer.returnValue(frozenset( + GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) + for r in rooms + )) + + @defer.inlineCallbacks + def get_rooms_for_user(self, user_id, on_invalidate=None): + """Returns a set of room_ids the user is currently joined to + """ + rooms = yield self.get_rooms_for_user_with_stream_ordering( + user_id, on_invalidate=on_invalidate, + ) defer.returnValue(frozenset(r.room_id for r in rooms)) @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True) -- cgit 1.5.1 From 926ba76e23ea9d55638baff541cdfaeb9e01ac47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Mar 2018 23:43:31 +0000 Subject: Replace ujson with simplejson --- synapse/api/filtering.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/message.py | 6 +++--- synapse/http/server.py | 5 ++--- synapse/replication/tcp/commands.py | 2 +- synapse/rest/client/v1/room.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/storage/account_data.py | 2 +- synapse/storage/background_updates.py | 2 +- synapse/storage/deviceinbox.py | 12 ++++++------ synapse/storage/devices.py | 2 +- synapse/storage/end_to_end_keys.py | 2 +- synapse/storage/event_push_actions.py | 2 +- synapse/storage/events.py | 4 ++-- synapse/storage/receipts.py | 2 +- synapse/storage/room.py | 2 +- synapse/storage/roommember.py | 2 +- synapse/storage/schema/delta/25/fts.py | 4 ++-- synapse/storage/schema/delta/27/ts.py | 2 +- synapse/storage/schema/delta/31/search_update.py | 4 ++-- synapse/storage/schema/delta/33/event_fields.py | 4 ++-- synapse/storage/search.py | 2 +- synapse/storage/tags.py | 2 +- synapse/storage/transactions.py | 2 +- 25 files changed, 37 insertions(+), 38 deletions(-) (limited to 'synapse/storage/roommember.py') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 83206348e5..db43219d24 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -17,7 +17,7 @@ from synapse.storage.presence import UserPresenceState from synapse.types import UserID, RoomID from twisted.internet import defer -import ujson as json +import simplejson as json import jsonschema from jsonschema import FormatChecker diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 668a90e495..ce2c87e400 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ujson as json +import simplejson as json import logging from canonicaljson import encode_canonical_json diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21f1717dd2..d7413833ed 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -35,7 +35,7 @@ from canonicaljson import encode_canonical_json import logging import random -import ujson +import simplejson logger = logging.getLogger(__name__) @@ -561,8 +561,8 @@ class MessageHandler(BaseHandler): # Ensure that we can round trip before trying to persist in db try: - dump = ujson.dumps(unfreeze(event.content)) - ujson.loads(dump) + dump = simplejson.dumps(unfreeze(event.content)) + simplejson.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise diff --git a/synapse/http/server.py b/synapse/http/server.py index 25466cd292..f1e9002e4d 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -36,7 +36,7 @@ from twisted.web.util import redirectTo import collections import logging import urllib -import ujson +import simplejson logger = logging.getLogger(__name__) @@ -370,8 +370,7 @@ def respond_with_json(request, code, json_object, send_cors=False, if canonical_json or synapse.events.USE_FROZEN_DICTS: json_bytes = encode_canonical_json(json_object) else: - # ujson doesn't like frozen_dicts. - json_bytes = ujson.dumps(json_object, ensure_ascii=False) + json_bytes = simplejson.dumps(json_object) return respond_with_json_bytes( request, code, json_bytes, diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 171227cce2..9633404f73 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -19,7 +19,7 @@ allowed to be sent by which side. """ import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 75b735b47d..80989731fa 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -29,7 +29,7 @@ from synapse.http.servlet import ( import logging import urllib -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a0a8e4b8e4..eb91c0b293 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit import itertools import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 40d2e664eb..a413cb6226 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ import os import re import fnmatch import cgi -import ujson as json +import simplejson as json import urlparse import itertools import datetime diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index 56a0bde549..40a2ad8d05 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks -import ujson as json +import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 11a1b942f1..8f3bff311a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -19,7 +19,7 @@ from . import engines from twisted.internet import defer -import ujson as json +import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 548e795daf..a879e5bfc1 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -import ujson +import simplejson from twisted.internet import defer @@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore): ) rows = [] for destination, edu in remote_messages_by_destination.items(): - edu_json = ujson.dumps(edu) + edu_json = simplejson.dumps(edu) rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) @@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore): " WHERE user_id = ?" ) txn.execute(sql, (user_id,)) - message_json = ujson.dumps(messages_by_device["*"]) + message_json = simplejson.dumps(messages_by_device["*"]) for row in txn: # Add the message for all devices for this user on this # server. @@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore): # Only insert into the local inbox if the device exists on # this server device = row[0] - message_json = ujson.dumps(messages_by_device[device]) + message_json = simplejson.dumps(messages_by_device[device]) messages_json_for_user[device] = message_json if messages_json_for_user: @@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(ujson.loads(row[1])) + messages.append(simplejson.loads(row[1])) if len(messages) < limit: stream_pos = current_stream_id return (messages, stream_pos) @@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore): messages = [] for row in txn: stream_pos = row[0] - messages.append(ujson.loads(row[1])) + messages.append(simplejson.loads(row[1])) if len(messages) < limit: stream_pos = current_stream_id return (messages, stream_pos) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index bd2effdf34..712106b83a 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import ujson as json +import simplejson as json from twisted.internet import defer diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2cebb203c6..ff8538ddf8 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json -import ujson as json +import simplejson as json from ._base import SQLBaseStore diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 8efe2fd4bb..575d710d5d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -21,7 +21,7 @@ from synapse.types import RoomStreamToken from .stream import lower_bound import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d08f7571d7..bcca563293 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,7 +38,7 @@ from functools import wraps import synapse.metrics import logging -import ujson as json +import simplejson as json # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 @@ -56,7 +56,7 @@ event_counter = metrics.register_counter( def encode_json(json_object): if USE_FROZEN_DICTS: - # ujson doesn't like frozen_dicts + # simplejson doesn't like frozen_dicts return encode_canonical_json(json_object) else: return json.dumps(json_object, ensure_ascii=False) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 12b3cc7f5f..2c3aa33693 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -20,7 +20,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 23688430b7..2051d8506d 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -23,7 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging -import ujson as json +import simplejson as json import re logger = logging.getLogger(__name__) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3e77fd3901..c1ca299285 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -27,7 +27,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index 4269ac69ad..e7351c3ae6 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -17,7 +17,7 @@ import logging from synapse.storage.prepare_database import get_statements from synapse.storage.engines import PostgresEngine, Sqlite3Engine -import ujson +import simplejson logger = logging.getLogger(__name__) @@ -66,7 +66,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = ujson.dumps(progress) + progress_json = simplejson.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py index 71b12a2731..c0176c41ee 100644 --- a/synapse/storage/schema/delta/27/ts.py +++ b/synapse/storage/schema/delta/27/ts.py @@ -16,7 +16,7 @@ import logging from synapse.storage.prepare_database import get_statements -import ujson +import simplejson logger = logging.getLogger(__name__) diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py index 470ae0c005..fe6b7d196d 100644 --- a/synapse/storage/schema/delta/31/search_update.py +++ b/synapse/storage/schema/delta/31/search_update.py @@ -16,7 +16,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.prepare_database import get_statements import logging -import ujson +import simplejson logger = logging.getLogger(__name__) @@ -49,7 +49,7 @@ def run_create(cur, database_engine, *args, **kwargs): "rows_inserted": 0, "have_added_indexes": False, } - progress_json = ujson.dumps(progress) + progress_json = simplejson.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py index 83066cccc9..1e002f9db2 100644 --- a/synapse/storage/schema/delta/33/event_fields.py +++ b/synapse/storage/schema/delta/33/event_fields.py @@ -15,7 +15,7 @@ from synapse.storage.prepare_database import get_statements import logging -import ujson +import simplejson logger = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = ujson.dumps(progress) + progress_json = simplejson.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 479b04c636..c19e4ea449 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -21,7 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging import re -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index bff73f3f04..982a500520 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -17,7 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer -import ujson as json +import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 8f61f7ffae..f825264ea9 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -23,7 +23,7 @@ from canonicaljson import encode_canonical_json from collections import namedtuple import logging -import ujson as json +import simplejson as json logger = logging.getLogger(__name__) -- cgit 1.5.1