diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 8af325a9f5..b7e9c716c8 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.async
from ._base import SQLBaseStore
from . import engines
@@ -92,7 +91,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.info("Starting background schema updates")
while True:
- yield synapse.util.async.sleep(
+ yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ce338514e8..968d2fed22 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,7 +15,7 @@
import logging
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from ._base import Cache
from . import background_updates
@@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
- reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_client_ips_batch
+ )
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..c4a0208ce4 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -16,7 +16,6 @@
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
-from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
@@ -800,7 +799,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
- yield sleep(5)
+ yield self.hs.get_clock().sleep(5)
finally:
self._doing_notif_rotation = False
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..f6a6e46b43 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -14,13 +14,14 @@
# limitations under the License.
from ._base import SQLBaseStore
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+ LoggingContext,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@@ -145,6 +146,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
+ log_ctx = LoggingContext.current_context()
+ log_ctx.record_event_fetch(len(missing_events_ids))
+
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@@ -265,7 +269,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
+ self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -278,7 +282,7 @@ class EventsWorkerStore(SQLBaseStore):
if event_list:
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c241167fbe..9c9cf46e7f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(ret['user_id'])
defer.returnValue(None)
- def user_delete_threepids(self, user_id):
- return self._simple_delete(
- "user_threepids",
- keyvalues={
- "user_id": user_id,
- },
- desc="user_delete_threepids",
- )
-
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7bfc3d91b5..48a88f755e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- txn.call_after(self.was_forgotten_at.invalidate_all)
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
- @cachedInlineCallbacks(num_args=3)
- def was_forgotten_at(self, user_id, room_id, event_id):
- """Returns whether user_id has elected to discard history for room_id at
- event_id.
-
- event_id must be a membership event."""
- def f(txn):
- sql = (
- "SELECT"
- " forgotten"
- " FROM"
- " room_memberships"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- " AND"
- " event_id = ?"
- )
- txn.execute(sql, (user_id, room_id, event_id))
- rows = txn.fetchall()
- return rows[0][0]
- forgot = yield self.runInteraction("did_forget_membership_at", f)
- defer.returnValue(forgot == 1)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 85c8fffc19..986a20400c 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
+from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
- "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
+ "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
for typ in types:
if typ[1] is None:
where_clauses.append("(type = ?)")
- where_args.extend(typ[0])
+ where_args.append(typ[0])
wildcard_types = True
else:
where_clauses.append("(type = ? AND state_key = ?)")
|