diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
import time
from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
from canonicaljson import json
from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
# psycopg2 on Python 2 returns buffer objects, which we need to cast to
# bytes to decode
- if PY2 and isinstance(db_content, buffer):
+ if PY2 and isinstance(db_content, builtins.buffer):
db_content = bytes(db_content)
# Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index cfb687cb53..61a029a53c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -90,7 +90,7 @@ class DirectoryWorkerStore(SQLBaseStore):
class DirectoryStore(DirectoryWorkerStore):
@defer.inlineCallbacks
def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
- """ Creates an associatin between a room alias and room_id/servers
+ """ Creates an association between a room alias and room_id/servers
Args:
room_alias (RoomAlias)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03cedf3a75..c780f55277 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Ok, we need to defer to the state handler to resolve our state sets.
- def get_events(ev_ids):
- return self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
state_groups = {
sg: state_groups_map[sg] for sg in new_state_groups
}
@@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
- room_id, room_version, state_groups, events_map, get_events
+ room_id, room_version, state_groups, events_map,
+ state_res_store=StateResolutionStore(self)
)
defer.returnValue((res.state, None))
@@ -854,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
+ # We want to store event_auth mappings for rejected events, as they're
+ # used in state res v2.
+ # This is only necessary if the rejected event appears in an accepted
+ # event's auth chain, but its easier for now just to store them (and
+ # it doesn't take much storage compared to storing the entire event
+ # anyway).
+ self._simple_insert_many_txn(
+ txn,
+ table="event_auth",
+ values=[
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ }
+ for event, _ in events_and_contexts
+ for auth_id, _ in event.auth_events
+ if event.is_state()
+ ],
+ )
+
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
@@ -1329,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, event.room_id, event.redacts
)
- self._simple_insert_many_txn(
- txn,
- table="event_auth",
- values=[
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- }
- for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
- if event.is_state()
- ],
- )
-
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 0fe8c8e24c..cf4104dc2e 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -33,19 +33,29 @@ class MonthlyActiveUsersStore(SQLBaseStore):
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
+ # Do not add more reserved users than the total allowable number
+ self._initialise_reserved_users(
+ dbconn.cursor(),
+ hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
+ )
- @defer.inlineCallbacks
- def initialise_reserved_users(self, threepids):
- store = self.hs.get_datastore()
+ def _initialise_reserved_users(self, txn, threepids):
+ """Ensures that reserved threepids are accounted for in the MAU table, should
+ be called on start up.
+
+ Args:
+ txn (cursor):
+ threepids (list[dict]): List of threepid dicts to reserve
+ """
reserved_user_list = []
- # Do not add more reserved users than the total allowable number
- for tp in threepids[:self.hs.config.max_mau_value]:
- user_id = yield store.get_user_id_by_threepid(
+ for tp in threepids:
+ user_id = self.get_user_id_by_threepid_txn(
+ txn,
tp["medium"], tp["address"]
)
if user_id:
- yield self.upsert_monthly_active_user(user_id)
+ self.upsert_monthly_active_user_txn(txn, user_id)
reserved_user_list.append(user_id)
else:
logger.warning(
@@ -55,8 +65,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def reap_monthly_active_users(self):
- """
- Cleans out monthly active user table to ensure that no stale
+ """Cleans out monthly active user table to ensure that no stale
entries exist.
Returns:
@@ -165,19 +174,44 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
+ """Updates or inserts the user into the monthly active user table, which
+ is used to track the current MAU usage of the server
+
+ Args:
+ user_id (str): user to add/update
"""
- Updates or inserts monthly active user member
- Arguments:
- user_id (str): user to add/update
- Deferred[bool]: True if a new entry was created, False if an
- existing one was updated.
+ is_insert = yield self.runInteraction(
+ "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
+ user_id
+ )
+
+ if is_insert:
+ self.user_last_seen_monthly_active.invalidate((user_id,))
+ self.get_monthly_active_count.invalidate(())
+
+ def upsert_monthly_active_user_txn(self, txn, user_id):
+ """Updates or inserts monthly active user member
+
+ Note that, after calling this method, it will generally be necessary
+ to invalidate the caches on user_last_seen_monthly_active and
+ get_monthly_active_count. We can't do that here, because we are running
+ in a database thread rather than the main thread, and we can't call
+ txn.call_after because txn may not be a LoggingTransaction.
+
+ Args:
+ txn (cursor):
+ user_id (str): user to add/update
+
+ Returns:
+ bool: True if a new entry was created, False if an
+ existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
- is_insert = yield self._simple_upsert(
- desc="upsert_monthly_active_user",
+ is_insert = self._simple_upsert_txn(
+ txn,
table="monthly_active_users",
keyvalues={
"user_id": user_id,
@@ -186,9 +220,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"timestamp": int(self._clock.time_msec()),
},
)
- if is_insert:
- self.user_last_seen_monthly_active.invalidate((user_id,))
- self.get_monthly_active_count.invalidate(())
+
+ return is_insert
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26b429e307..80d76bf9d7 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -474,17 +474,44 @@ class RegistrationStore(RegistrationWorkerStore,
@defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address):
- ret = yield self._simple_select_one(
+ """Returns user id from threepid
+
+ Args:
+ medium (str): threepid medium e.g. email
+ address (str): threepid address e.g. me@example.com
+
+ Returns:
+ Deferred[str|None]: user id or None if no user id/threepid mapping exists
+ """
+ user_id = yield self.runInteraction(
+ "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
+ medium, address
+ )
+ defer.returnValue(user_id)
+
+ def get_user_id_by_threepid_txn(self, txn, medium, address):
+ """Returns user id from threepid
+
+ Args:
+ txn (cursor):
+ medium (str): threepid medium e.g. email
+ address (str): threepid address e.g. me@example.com
+
+ Returns:
+ str|None: user id or None if no user id/threepid mapping exists
+ """
+ ret = self._simple_select_one_txn(
+ txn,
"user_threepids",
{
"medium": medium,
"address": address
},
- ['user_id'], True, 'get_user_id_by_threepid'
+ ['user_id'], True
)
if ret:
- defer.returnValue(ret['user_id'])
- defer.returnValue(None)
+ return ret['user_id']
+ return None
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
@@ -567,7 +594,7 @@ class RegistrationStore(RegistrationWorkerStore,
def _find_next_generated_user_id(txn):
txn.execute("SELECT name FROM users")
- regex = re.compile("^@(\d+):")
+ regex = re.compile(r"^@(\d+):")
found = set()
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
|