From 9c272da05fcf51534aaa877647bc3b82bf841cf3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 May 2016 13:42:44 +0100 Subject: Add an openidish mechanism for proving to third parties that you own a given user_id --- synapse/storage/__init__.py | 4 +++- synapse/storage/openid.py | 32 ++++++++++++++++++++++++++++++ synapse/storage/schema/delta/32/openid.sql | 9 +++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/openid.py create mode 100644 synapse/storage/schema/delta/32/openid.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7122b0cbb1..d970fde9e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 0000000000..5dabb607bd --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 0000000000..36f37b11c8 --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); -- cgit 1.5.1 From 56b5e83e36f22f3eab1ebf7a46b9f23f0c1a3e8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:20:18 +0100 Subject: Reduce database inserts when sending transactions --- synapse/handlers/presence.py | 2 +- synapse/storage/transactions.py | 157 +++++++++++++++++++++++++++++----------- 2 files changed, 114 insertions(+), 45 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328b..639567953a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0a..17fc601983 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,54 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +146,28 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row + + # TODO: Fetch prev_txns - next_id = self._transaction_id_gen.get_next() + return self.runInteraction( + "prep_send_transaction", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +182,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +193,21 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +331,46 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") -- cgit 1.5.1 From 1d275dba69272f6df5a79871dc4b8d31e71514d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:25:58 +0100 Subject: Don't needlessly enter transaction --- synapse/storage/transactions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 17fc601983..ba1969b243 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -159,10 +159,8 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - # TODO: Fetch prev_txns - return self.runInteraction( - "prep_send_transaction", + "_get_prevs_txn", self._get_prevs_txn, destination, ) @@ -350,11 +348,12 @@ class TransactionStore(SQLBaseStore): ] def f(txn): - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) for dest, txn_map in update_delivered.items(): for txn_id, update_row in txn_map.items(): @@ -371,6 +370,7 @@ class TransactionStore(SQLBaseStore): } ) - yield self.runInteraction("_persist_in_mem_txns", f) + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") -- cgit 1.5.1 From d13459636fab9637a43c950d17f883ca77b832f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:30:55 +0100 Subject: Pull prev txn from in memory --- synapse/storage/transactions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ba1969b243..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -58,6 +58,8 @@ class TransactionStore(SQLBaseStore): # Newly delivered transactions that *were* persisted while in flight self.update_delivered_transactions = {} + self.last_transaction = {} + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) hs.get_clock().looping_call( self._persist_in_mem_txns, @@ -159,11 +161,15 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. @@ -196,6 +202,8 @@ class TransactionStore(SQLBaseStore): destination, {} ).pop(transaction_id, None) + self.last_transaction[destination] = transaction_id + if txn_row: d = self.new_delivered_transactions.setdefault(destination, {}) d[transaction_id] = txn_row._replace( -- cgit 1.5.1 From b6e0be701eb8ae36236242549ef356397f5f1d06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 14:31:38 +0100 Subject: Queue events for persistence --- synapse/storage/events.py | 155 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 144 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0307b2af3c..07e873fce4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,12 +19,14 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + import logging import math @@ -50,6 +52,80 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with a 'queue' arg, which is a + generator over _EventPersistQueueItem's. The queue will finish if there + are no longer any items in the room queue. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + try: + callback(self._get_drainining_queue(room_id)) + finally: + self._currently_persisting_rooms.discard(room_id) + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.pop(room_id, None) + if not queue: + return + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" @@ -59,19 +135,80 @@ class EventsStore(SQLBaseStore): self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) - """ + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(queue): + for item in queue: + try: + ret = None + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + logger.info("Resolving with ret: %r", ret) + item.deferred.callback(ret) + except Exception as e: + logger.exception("Failed to persist events") + item.deferred.errback(e) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -118,8 +255,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -136,9 +272,6 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, -- cgit 1.5.1 From fd85b167ecb650451bf8bbfc68b771f607bb1d9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 15:38:42 +0100 Subject: Pull loop one level up --- synapse/storage/events.py | 77 +++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 36 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 07e873fce4..9f8f0a0823 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.async import ObservableDeferred +from synapse.util.async import ObservableDeferred, run_on_reactor from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -89,12 +89,14 @@ class _EventPeristenceQueue(object): return deferred.observe() - def handle_queue(self, room_id, callback): + def handle_queue(self, room_id, per_item_callback): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with a 'queue' arg, which is a - generator over _EventPersistQueueItem's. The queue will finish if there - are no longer any items in the room queue. + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. This function should therefore be called whenever anything is added to the queue. @@ -108,15 +110,26 @@ class _EventPeristenceQueue(object): self._currently_persisting_rooms.add(room_id) - try: - callback(self._get_drainining_queue(room_id)) - finally: - self._currently_persisting_rooms.discard(room_id) + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() def _get_drainining_queue(self, room_id): - queue = self._event_persist_queues.pop(room_id, None) - if not queue: - return + queue = self._event_persist_queues.setdefault(room_id, deque()) try: while True: @@ -180,30 +193,22 @@ class EventsStore(SQLBaseStore): def _maybe_start_persisting(self, room_id): @defer.inlineCallbacks - def persisting_queue(queue): - for item in queue: - try: - ret = None - if item.current_state: - for event, context in item.events_and_contexts: - # There should only ever be one item in - # events_and_contexts when current_state is - # not None - yield self._persist_event( - event, context, - current_state=item.current_state, - backfilled=item.backfilled, - ) - else: - yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, - ) - logger.info("Resolving with ret: %r", ret) - item.deferred.callback(ret) - except Exception as e: - logger.exception("Failed to persist events") - item.deferred.errback(e) + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) self._event_persist_queue.handle_queue(room_id, persisting_queue) -- cgit 1.5.1 From fcb2c3f0db70593709c3bfbfc66772066c4aa061 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 15:47:40 +0100 Subject: Remove unused import --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9f8f0a0823..a27b7919cd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.async import ObservableDeferred, run_on_reactor +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -- cgit 1.5.1 From 4ea762c1a28edbf18d0a183ed35fb8a5a11847c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:08:21 +0100 Subject: Add cache to get_user_by_id --- synapse/storage/registration.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): -- cgit 1.5.1