diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/background_updates.py | 3 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 6 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 3 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 10 | ||||
-rw-r--r-- | synapse/storage/registration.py | 9 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 26 | ||||
-rw-r--r-- | synapse/storage/state.py | 6 |
7 files changed, 16 insertions, 47 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 8af325a9f5..b7e9c716c8 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -92,7 +91,7 @@ class BackgroundUpdateStore(SQLBaseStore): logger.info("Starting background schema updates") while True: - yield synapse.util.async.sleep( + yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ce338514e8..968d2fed22 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer, reactor +from twisted.internet import defer from ._base import Cache from . import background_updates @@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._client_ip_looper = self._clock.looping_call( self._update_client_ips_batch, 5 * 1000 ) - reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0350ee5fe..c4a0208ce4 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -16,7 +16,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction from twisted.internet import defer -from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks import logging @@ -800,7 +799,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) if caught_up: break - yield sleep(5) + yield self.hs.get_clock().sleep(5) finally: self._doing_notif_rotation = False diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 32d9d00ffb..f6a6e46b43 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -14,13 +14,14 @@ # limitations under the License. from ._base import SQLBaseStore -from twisted.internet import defer, reactor +from twisted.internet import defer from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background, + LoggingContext, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -145,6 +146,9 @@ class EventsWorkerStore(SQLBaseStore): missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: + log_ctx = LoggingContext.current_context() + log_ctx.record_event_fetch(len(missing_events_ids)) + missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, @@ -265,7 +269,7 @@ class EventsWorkerStore(SQLBaseStore): except Exception: logger.exception("Failed to callback") with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list, row_dict) + self.hs.get_reactor().callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -278,7 +282,7 @@ class EventsWorkerStore(SQLBaseStore): if event_list: with PreserveLoggingContext(): - reactor.callFromThread(fire, event_list) + self.hs.get_reactor().callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index c241167fbe..9c9cf46e7f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore, defer.returnValue(ret['user_id']) defer.returnValue(None) - def user_delete_threepids(self, user_id): - return self._simple_delete( - "user_threepids", - keyvalues={ - "user_id": user_id, - }, - desc="user_delete_threepids", - ) - def user_delete_threepid(self, user_id, medium, address): return self._simple_delete( "user_threepids", diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7bfc3d91b5..48a88f755e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - txn.call_after(self.was_forgotten_at.invalidate_all) txn.call_after(self.did_forget.invalidate, (user_id, room_id)) self._invalidate_cache_and_stream( txn, self.who_forgot_in_room, (room_id,) @@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) - @cachedInlineCallbacks(num_args=3) - def was_forgotten_at(self, user_id, room_id, event_id): - """Returns whether user_id has elected to discard history for room_id at - event_id. - - event_id must be a membership event.""" - def f(txn): - sql = ( - "SELECT" - " forgotten" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " event_id = ?" - ) - txn.execute(sql, (user_id, room_id, event_id)) - rows = txn.fetchall() - return rows[0][0] - forgot = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(forgot == 1) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 85c8fffc19..986a20400c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine -from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR +from synapse.util.caches import intern_string, get_cache_factor_for from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.stringutils import to_ascii @@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore): super(StateGroupWorkerStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") ) @cached(max_entries=100000, iterable=True) @@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore): for typ in types: if typ[1] is None: where_clauses.append("(type = ?)") - where_args.extend(typ[0]) + where_args.append(typ[0]) wildcard_types = True else: where_clauses.append("(type = ? AND state_key = ?)") |