diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8228069271..629c110bed 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -94,7 +94,7 @@ class DataStore(RoomMemberStore, RoomStore,
stream_ordering = self.min_token
try:
- yield self._db_pool.runInteraction(
+ yield self.runInteraction(
self._persist_pdu_event_txn,
pdu=pdu,
event=event,
@@ -297,7 +297,7 @@ class DataStore(RoomMemberStore, RoomStore,
prev_state_pdu=prev_state_pdu,
)
- return self._db_pool.runInteraction(_snapshot)
+ return self.runInteraction(_snapshot)
class Snapshot(object):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8037225079..8a36f0bc6a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -34,6 +34,10 @@ class SQLBaseStore(object):
self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
+ def runInteraction(self, txn, *args, **kwargs):
+ """Wraps the .runInteraction() method on the underlying db_pool."""
+ return self._db_pool.runInteraction(txn, *args, **kwargs)
+
def cursor_to_dict(self, cursor):
"""Converts a SQL cursor into an list of dicts.
@@ -71,7 +75,7 @@ class SQLBaseStore(object):
else:
return cursor.fetchall()
- return self._db_pool.runInteraction(interaction)
+ return self.runInteraction(interaction)
def _execute_and_decode(self, query, *args):
return self._execute(self.cursor_to_dict, query, *args)
@@ -87,7 +91,7 @@ class SQLBaseStore(object):
values : dict of new column names and values for them
or_replace : bool; if True performs an INSERT OR REPLACE
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._simple_insert_txn, table, values, or_replace=or_replace
)
@@ -164,7 +168,7 @@ class SQLBaseStore(object):
txn.execute(sql, keyvalues.values())
return txn.fetchall()
- res = yield self._db_pool.runInteraction(func)
+ res = yield self.runInteraction(func)
defer.returnValue([r[0] for r in res])
@@ -187,7 +191,7 @@ class SQLBaseStore(object):
txn.execute(sql, keyvalues.values())
return self.cursor_to_dict(txn)
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_update_one(self, table, keyvalues, updatevalues,
retcols=None):
@@ -255,7 +259,7 @@ class SQLBaseStore(object):
raise StoreError(500, "More than one row matched")
return ret
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_delete_one(self, table, keyvalues):
"""Executes a DELETE query on the named table, expecting to delete a
@@ -276,7 +280,7 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "more than one row matched")
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _simple_max_id(self, table):
"""Executes a SELECT query on the named table, expecting to return the
@@ -294,7 +298,7 @@ class SQLBaseStore(object):
return 0
return max_id
- return self._db_pool.runInteraction(func)
+ return self.runInteraction(func)
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
@@ -313,7 +317,7 @@ class SQLBaseStore(object):
)
def _parse_events(self, rows):
- return self._db_pool.runInteraction(self._parse_events_txn, rows)
+ return self.runInteraction(self._parse_events_txn, rows)
def _parse_events_txn(self, txn, rows):
events = [self._parse_event_from_row(r) for r in rows]
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 3cbce2d0a1..f770a82bcd 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -42,7 +42,7 @@ class PduStore(SQLBaseStore):
PduTuple: If the pdu does not exist in the database, returns None
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_pdu_tuple, pdu_id, origin
)
@@ -94,7 +94,7 @@ class PduStore(SQLBaseStore):
list: A list of PduTuples
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_current_state_for_context,
context
)
@@ -142,7 +142,7 @@ class PduStore(SQLBaseStore):
pdu_origin (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._mark_as_processed, pdu_id, pdu_origin
)
@@ -151,7 +151,7 @@ class PduStore(SQLBaseStore):
def get_all_pdus_from_context(self, context):
"""Get a list of all PDUs for a given context."""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_all_pdus_from_context, context,
)
@@ -178,7 +178,7 @@ class PduStore(SQLBaseStore):
Return:
list: A list of PduTuples
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_backfill, context, pdu_list, limit
)
@@ -239,7 +239,7 @@ class PduStore(SQLBaseStore):
txn
context (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_min_depth_for_context, context
)
@@ -345,7 +345,7 @@ class PduStore(SQLBaseStore):
bool
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._is_pdu_new,
pdu_id=pdu_id,
origin=origin,
@@ -498,7 +498,7 @@ class StatePduStore(SQLBaseStore):
)
def get_unresolved_state_tree(self, new_state_pdu):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_unresolved_state_tree, new_state_pdu
)
@@ -537,7 +537,7 @@ class StatePduStore(SQLBaseStore):
def update_current_state(self, pdu_id, origin, context, pdu_type,
state_key):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._update_current_state,
pdu_id, origin, context, pdu_type, state_key
)
@@ -576,7 +576,7 @@ class StatePduStore(SQLBaseStore):
PduEntry
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_current_state_pdu, context, pdu_type, state_key
)
@@ -638,7 +638,7 @@ class StatePduStore(SQLBaseStore):
PduIdTuple: A pdu that we are missing, or None if we have all the
pdus required to do the conflict resolution.
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_next_missing_pdu, new_pdu
)
@@ -682,7 +682,7 @@ class StatePduStore(SQLBaseStore):
Returns:
bool: True if the new_pdu clobbered the current state, False if not
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._handle_new_state, new_pdu
)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index fd762bc643..db20b1daa0 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if the user_id could not be registered.
"""
- yield self._db_pool.runInteraction(self._register, user_id, token,
+ yield self.runInteraction(self._register, user_id, token,
password_hash)
def _register(self, txn, user_id, token, password_hash):
@@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore):
Raises:
StoreError if no user was found.
"""
- user_id = yield self._db_pool.runInteraction(self._query_for_auth,
+ user_id = yield self.runInteraction(self._query_for_auth,
token)
defer.returnValue(user_id)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 017169ce00..5adf8cdf1b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -149,7 +149,7 @@ class RoomStore(SQLBaseStore):
defer.returnValue(None)
def get_power_level(self, room_id, user_id):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_power_level,
room_id, user_id,
)
@@ -182,7 +182,7 @@ class RoomStore(SQLBaseStore):
return None
def get_ops_levels(self, room_id):
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_ops_levels,
room_id,
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index b357dc3058..8cbc15356d 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -71,6 +71,11 @@ class RoomMemberStore(SQLBaseStore):
txn.execute(sql, (room_id, domain))
+ def store_room_member(self, user_id, room_id, event_id, membership):
+ return self.runInteraction(self._store_room_member_txn,
+ user_id, user_id, room_id, event_id, membership
+ )
+
@defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index aff6dc9855..8c766b8a00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -286,7 +286,7 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
def get_room_events_max_id(self):
- return self._db_pool.runInteraction(self._get_room_events_max_id_txn)
+ return self.runInteraction(self._get_room_events_max_id_txn)
def _get_room_events_max_id_txn(self, txn):
txn.execute(
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 7467e1035b..ab4599b468 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -41,7 +41,7 @@ class TransactionStore(SQLBaseStore):
this transaction or a 2-tuple of (int, dict)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_received_txn_response, transaction_id, origin
)
@@ -72,7 +72,7 @@ class TransactionStore(SQLBaseStore):
response_json (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._set_received_txn_response,
transaction_id, origin, code, response_dict
)
@@ -104,7 +104,7 @@ class TransactionStore(SQLBaseStore):
list: A list of previous transaction ids.
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._prep_send_transaction,
transaction_id, destination, ts, pdu_list
)
@@ -159,7 +159,7 @@ class TransactionStore(SQLBaseStore):
code (int)
response_json (str)
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._delivered_txn,
transaction_id, destination, code, response_dict
)
@@ -184,7 +184,7 @@ class TransactionStore(SQLBaseStore):
Returns:
list: A list of `ReceivedTransactionsTable.EntryType`
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_transactions_after, transaction_id, destination
)
@@ -214,7 +214,7 @@ class TransactionStore(SQLBaseStore):
Returns
list: A list of PduTuple
"""
- return self._db_pool.runInteraction(
+ return self.runInteraction(
self._get_pdus_after_transaction,
transaction_id, destination
)
|