diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..ba88a54979 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -15,50 +15,48 @@
# limitations under the License.
import datetime
-from dateutil import tz
-import time
import logging
+import time
+from dateutil import tz
+
+from synapse.api.constants import PresenceState
from synapse.storage.devices import DeviceStore
-from .appservice import (
- ApplicationServiceStore, ApplicationServiceTransactionStore
-)
+from synapse.storage.user_erasure_store import UserErasureStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from .account_data import AccountDataStore
+from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
+from .client_ips import ClientIpStore
+from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
+from .end_to_end_keys import EndToEndKeyStore
+from .engines import PostgresEngine
+from .event_federation import EventFederationStore
+from .event_push_actions import EventPushActionsStore
from .events import EventsStore
+from .filtering import FilteringStore
+from .group_server import GroupServerStore
+from .keys import KeyStore
+from .media_repository import MediaRepositoryStore
+from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
+from .push_rule import PushRuleStore
+from .pusher import PusherStore
+from .receipts import ReceiptsStore
from .registration import RegistrationStore
+from .rejections import RejectionsStore
from .room import RoomStore
from .roommember import RoomMemberStore
-from .stream import StreamStore
-from .transactions import TransactionStore
-from .keys import KeyStore
-from .event_federation import EventFederationStore
-from .pusher import PusherStore
-from .push_rule import PushRuleStore
-from .media_repository import MediaRepositoryStore
-from .rejections import RejectionsStore
-from .event_push_actions import EventPushActionsStore
-from .deviceinbox import DeviceInboxStore
-from .group_server import GroupServerStore
-from .state import StateStore
-from .signatures import SignatureStore
-from .filtering import FilteringStore
-from .end_to_end_keys import EndToEndKeyStore
-
-from .receipts import ReceiptsStore
from .search import SearchStore
+from .signatures import SignatureStore
+from .state import StateStore
+from .stream import StreamStore
from .tags import TagsStore
-from .account_data import AccountDataStore
-from .openid import OpenIdStore
-from .client_ips import ClientIpStore
+from .transactions import TransactionStore
from .user_directory import UserDirectoryStore
-
-from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
-from .engines import PostgresEngine
-
-from synapse.api.constants import PresenceState
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
logger = logging.getLogger(__name__)
@@ -88,6 +86,7 @@ class DataStore(RoomMemberStore, RoomStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
+ UserErasureStore,
):
def __init__(self, db_conn, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 22d6257a9f..1d41d8d445 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -13,22 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import sys
+import threading
+import time
-from synapse.api.errors import StoreError
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.util.caches.descriptors import Cache
-from synapse.storage.engines import PostgresEngine
+from six import iteritems, iterkeys, itervalues
+from six.moves import intern, range
from prometheus_client import Histogram
from twisted.internet import defer
-import sys
-import time
-import threading
-
-from six import itervalues, iterkeys, iteritems
-from six.moves import intern, range
+from synapse.api.errors import StoreError
+from synapse.storage.engines import PostgresEngine
+from synapse.util.caches.descriptors import Cache
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@@ -221,7 +220,7 @@ class SQLBaseStore(object):
self._clock.looping_call(loop, 10000)
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
- logging_context, func, *args, **kwargs):
+ func, *args, **kwargs):
start = time.time()
txn_id = self._TXN_ID
@@ -285,8 +284,7 @@ class SQLBaseStore(object):
end = time.time()
duration = end - start
- if logging_context is not None:
- logging_context.add_database_transaction(duration)
+ LoggingContext.current_context().add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
@@ -310,19 +308,15 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
- current_context = LoggingContext.current_context()
-
after_callbacks = []
exception_callbacks = []
- def inner_func(conn, *args, **kwargs):
- return self._new_transaction(
- conn, desc, after_callbacks, exception_callbacks, current_context,
- func, *args, **kwargs
- )
-
try:
- result = yield self.runWithConnection(inner_func, *args, **kwargs)
+ result = yield self.runWithConnection(
+ self._new_transaction,
+ desc, after_callbacks, exception_callbacks, func,
+ *args, **kwargs
+ )
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
@@ -347,22 +341,25 @@ class SQLBaseStore(object):
Returns:
Deferred: The result of func
"""
- current_context = LoggingContext.current_context()
+ parent_context = LoggingContext.current_context()
+ if parent_context == LoggingContext.sentinel:
+ logger.warn(
+ "Starting db connection from sentinel context: metrics will be lost",
+ )
+ parent_context = None
start_time = time.time()
def inner_func(conn, *args, **kwargs):
- with LoggingContext("runWithConnection") as context:
+ with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = time.time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
- current_context.add_database_scheduled(sched_duration_sec)
+ context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
- current_context.copy_to(context)
-
return func(conn, *args, **kwargs)
with PreserveLoggingContext():
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index f83ff0454a..bbc3355c73 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -14,17 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
+import logging
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.util.id_generators import StreamIdGenerator
-
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
-
-import abc
-import simplejson as json
-import logging
logger = logging.getLogger(__name__)
@@ -114,25 +114,6 @@ class AccountDataWorkerStore(SQLBaseStore):
else:
defer.returnValue(None)
- @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
- num_args=2, list_name="user_ids", inlineCallbacks=True)
- def get_global_account_data_by_type_for_users(self, data_type, user_ids):
- rows = yield self._simple_select_many_batch(
- table="account_data",
- column="user_id",
- iterable=user_ids,
- keyvalues={
- "account_data_type": data_type,
- },
- retcols=("user_id", "content",),
- desc="get_global_account_data_by_type_for_users",
- )
-
- defer.returnValue({
- row["user_id"]: json.loads(row["content"]) if row["content"] else None
- for row in rows
- })
-
@cached(num_args=2)
def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room.
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..9f12b360bc 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,14 +15,16 @@
# limitations under the License.
import logging
import re
-import simplejson as json
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 8af325a9f5..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,15 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.async
-from ._base import SQLBaseStore
-from . import engines
+import logging
+
+from canonicaljson import json
from twisted.internet import defer
-import simplejson as json
-import logging
+from synapse.metrics.background_process_metrics import run_as_background_process
+
+from . import engines
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -87,12 +89,16 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {}
self._all_done = False
- @defer.inlineCallbacks
def start_doing_background_updates(self):
- logger.info("Starting background schema updates")
+ run_as_background_process(
+ "background_updates", self._run_background_updates,
+ )
+ @defer.inlineCallbacks
+ def _run_background_updates(self):
+ logger.info("Starting background schema updates")
while True:
- yield synapse.util.async.sleep(
+ yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ce338514e8..77ae10da3d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,15 +15,15 @@
import logging
-from twisted.internet import defer, reactor
+from six import iteritems
-from ._base import Cache
-from . import background_updates
+from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
-from six import iteritems
-
+from . import background_updates
+from ._base import Cache
logger = logging.getLogger(__name__)
@@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
- reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_client_ips_batch
+ )
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
@@ -92,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ def update():
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn,
+ to_update,
+ )
+
+ run_as_background_process(
+ "update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index a879e5bfc1..73646da025 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,14 +14,14 @@
# limitations under the License.
import logging
-import simplejson
-from twisted.internet import defer
+from canonicaljson import json
-from .background_updates import BackgroundUpdateStore
+from twisted.internet import defer
from synapse.util.caches.expiringcache import ExpiringCache
+from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
@@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
)
rows = []
for destination, edu in remote_messages_by_destination.items():
- edu_json = simplejson.dumps(edu)
+ edu_json = json.dumps(edu)
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
@@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
" WHERE user_id = ?"
)
txn.execute(sql, (user_id,))
- message_json = simplejson.dumps(messages_by_device["*"])
+ message_json = json.dumps(messages_by_device["*"])
for row in txn:
# Add the message for all devices for this user on this
# server.
@@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
- message_json = simplejson.dumps(messages_by_device[device])
+ message_json = json.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json
if messages_json_for_user:
@@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
- messages.append(simplejson.loads(row[1]))
+ messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
@@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
- messages.append(simplejson.loads(row[1]))
+ messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d149d8392e..ec68e39f1e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson as json
+
+from six import iteritems, itervalues
+
+from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
-from ._base import SQLBaseStore, Cache
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from six import itervalues, iteritems
+from ._base import Cache, SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index d0c0059757..808194236a 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
-
-from synapse.api.errors import SynapseError
+from collections import namedtuple
from twisted.internet import defer
-from collections import namedtuple
+from synapse.api.errors import SynapseError
+from synapse.util.caches.descriptors import cached
+from ._base import SQLBaseStore
RoomAliasMapping = namedtuple(
"RoomAliasMapping",
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b146487943..7ae5c65482 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,17 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import iteritems
+
+from canonicaljson import encode_canonical_json, json
+
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
-from canonicaljson import encode_canonical_json
-import simplejson as json
-
from ._base import SQLBaseStore
-from six import iteritems
-
class EndToEndKeyStore(SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 8c868ece75..e2f9de8451 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -13,13 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import IncorrectDatabaseSetup
-from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
-
import importlib
import platform
+from ._base import IncorrectDatabaseSetup
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 60f0fa7fb3..19949fc474 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import prepare_database
-
import struct
import threading
+from synapse.storage.prepare_database import prepare_database
+
class Sqlite3Engine(object):
single_threaded = True
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8fbf7ffba7..8d366d1b91 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,23 +12,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import random
+from six.moves import range
+from six.moves.queue import Empty, PriorityQueue
+
+from unpaddedbase64 import encode_base64
+
from twisted.internet import defer
+from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
-
-from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
-from unpaddedbase64 import encode_base64
-
-import logging
-from six.moves.queue import PriorityQueue, Empty
-
-from six.moves import range
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..29b511ae5e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,16 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage._base import SQLBaseStore, LoggingTransaction
-from twisted.internet import defer
-from synapse.util.async import sleep
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
import logging
-import simplejson as json
from six import iteritems
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
logger = logging.getLogger(__name__)
@@ -84,6 +85,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
+ self._rotate_delay = 3
+ self._rotate_count = 10000
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@@ -800,7 +803,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
- yield sleep(5)
+ yield self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@@ -821,8 +824,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
- ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
- """, (old_rotate_stream_ordering,))
+ ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+ """, (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,36 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
import itertools
import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, itervalues
+from six.moves import range
+
+from canonicaljson import json
+from prometheus_client import Counter
-import simplejson as json
from twisted.internet import defer
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
logger = logging.getLogger(__name__)
@@ -158,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore):
]
)
- self._curr_state_delta_stream_cache.entity_has_changed(
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
@@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
+ for table in (
+ "event_push_actions",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,27 +12,29 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+import logging
+from collections import namedtuple
-from twisted.internet import defer, reactor
+from canonicaljson import json
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
-
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+ LoggingContext,
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
)
from synapse.util.metrics import Measure
-from synapse.api.errors import SynapseError
-
-from collections import namedtuple
-
-import logging
-import simplejson as json
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
+ log_ctx = LoggingContext.current_context()
+ log_ctx.record_event_fetch(len(missing_events_ids))
+
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@@ -218,32 +223,47 @@ class EventsWorkerStore(SQLBaseStore):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
"""
- event_list = []
i = 0
while True:
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
- self._event_fetch_ongoing -= 1
- return
- else:
- self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
- i += 1
- continue
- i = 0
+ with self._event_fetch_lock:
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
+ if not event_list:
+ single_threaded = self.database_engine.single_threaded
+ if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+ self._event_fetch_ongoing -= 1
+ return
+ else:
+ self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+ i += 1
+ continue
+ i = 0
+
+ self._fetch_event_list(conn, event_list)
+
+ def _fetch_event_list(self, conn, event_list):
+ """Handle a load of requests from the _event_fetch_list queue
+
+ Args:
+ conn (twisted.enterprise.adbapi.Connection): database connection
+ event_list (list[Tuple[list[str], Deferred]]):
+ The fetch requests. Each entry consists of a list of event
+ ids to be fetched, and a deferred to be completed once the
+ events have been fetched.
+
+ """
+ with Measure(self._clock, "_fetch_event_list"):
+ try:
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
rows = self._new_transaction(
- conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+ conn, "do_fetch", [], [],
+ self._fetch_event_rows, event_ids,
)
row_dict = {
@@ -265,7 +285,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
+ self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -276,9 +296,8 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
d.errback(e)
- if event_list:
- with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
+ with PreserveLoggingContext():
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
@@ -304,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
+ run_as_background_process(
+ "fetch_events",
+ self.runWithConnection,
+ self._do_fetch,
+ )
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2e2763126d..2d5896c5b4 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from canonicaljson import encode_canonical_json, json
+
from twisted.internet import defer
-from ._base import SQLBaseStore
-from synapse.api.errors import SynapseError, Codes
+from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from ._base import SQLBaseStore
class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index da05ccb027..592d1b4c2a 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -14,15 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import SynapseError
from ._base import SQLBaseStore
-import simplejson as json
-
-
# The category ID for the "default" category. We don't store as null in the
# database to avoid the fun of null != null
_DEFAULT_CATEGORY_ID = ""
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 0f13b61da8..f547977600 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -13,17 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+import hashlib
+import logging
-from twisted.internet import defer
import six
-import OpenSSL
from signedjson.key import decode_verify_key_bytes
-import hashlib
-import logging
+import OpenSSL
+from twisted.internet import defer
+
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index cf2aae0468..b290f834b3 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -20,7 +20,6 @@ import logging
import os
import re
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index f05d91cc58..a0c7a0dc87 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from collections import namedtuple
+
+from twisted.internet import defer
+
from synapse.api.constants import PresenceState
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util import batch_iter
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from collections import namedtuple
-from twisted.internet import defer
+from ._base import SQLBaseStore
class UserPresenceState(namedtuple("UserPresenceState",
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 8612bd5ecc..60295da254 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -15,8 +15,8 @@
from twisted.internet import defer
-from synapse.storage.roommember import ProfileInfo
from synapse.api.errors import StoreError
+from synapse.storage.roommember import ProfileInfo
from ._base import SQLBaseStore
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 04a0b59a39..be655d287b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,20 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+import abc
+import logging
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes
+from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
from synapse.storage.receipts import ReceiptsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.push.baserules import list_with_base_rules
-from synapse.api.constants import EventTypes
-from twisted.internet import defer
-import abc
-import logging
-import simplejson as json
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 307660b99a..cc273a57b2 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -14,16 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from twisted.internet import defer
+import logging
+import types
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
-import logging
-import simplejson as json
-import types
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c93c228f6e..0ac665e967 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -14,17 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from .util.id_generators import StreamIdGenerator
-from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+import abc
+import logging
+
+from canonicaljson import json
from twisted.internet import defer
-import abc
-import logging
-import simplejson as json
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from ._base import SQLBaseStore
+from .util.id_generators import StreamIdGenerator
logger = logging.getLogger(__name__)
@@ -139,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
room_ids = set(room_ids)
- if from_key:
+ if from_key is not None:
+ # Only ask the database about rooms where there have been new
+ # receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
@@ -150,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -161,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
from the start.
Returns:
- list: A list of receipts.
+ Deferred[list]: A list of receipts.
+ """
+ if from_key is not None:
+ # Check the cache first to see if any new receipts have been added
+ # since`from_key`. If not we can no-op.
+ if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
+ defer.succeed([])
+
+ return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
+
+ @cachedInlineCallbacks(num_args=3, tree=True)
+ def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
+ """See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
@@ -210,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"content": content,
}])
- @cachedList(cached_method_name="get_linearized_receipts_for_room",
+ @cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
@@ -372,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@@ -492,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c241167fbe..07333f777d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,15 +15,15 @@
import re
+from six.moves import range
+
from twisted.internet import defer
-from synapse.api.errors import StoreError, Codes
+from synapse.api.errors import Codes, StoreError
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from six.moves import range
-
class RegistrationWorkerStore(SQLBaseStore):
@cached()
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(ret['user_id'])
defer.returnValue(None)
- def user_delete_threepids(self, user_id):
- return self._simple_delete(
- "user_threepids",
- keyvalues={
- "user_id": user_id,
- },
- desc="user_delete_threepids",
- )
-
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
@@ -632,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
- return self._simple_delete_one(
+ # XXX: This should be simple_delete_one but we failed to put a unique index on
+ # the table, so somehow duplicate entries have ended up in it.
+ return self._simple_delete(
"users_pending_deactivation",
keyvalues={
"user_id": user_id,
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
index 40acb5c4ed..880f047adb 100644
--- a/synapse/storage/rejections.py
+++ b/synapse/storage/rejections.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-
import logging
+from ._base import SQLBaseStore
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ea6a189185..3147fb6827 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -13,6 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
+import logging
+import re
+
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import StoreError
@@ -20,11 +26,6 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-import collections
-import logging
-import simplejson as json
-import re
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 48a88f755e..02a802bed9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -14,24 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
+import logging
from collections import namedtuple
+from six import iteritems, itervalues
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
from synapse.storage.events import EventsWorkerStore
+from synapse.types import get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.stringutils import to_ascii
-from synapse.api.constants import Membership, EventTypes
-from synapse.types import get_domain_from_id
-
-import logging
-import simplejson as json
-
-from six import itervalues, iteritems
-
logger = logging.getLogger(__name__)
@@ -455,7 +454,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(joined_hosts)
- @cached(max_entries=10000, iterable=True)
+ @cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index e7351c3ae6..4b2ffd35fd 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -14,11 +14,11 @@
import logging
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-
import simplejson
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index 6df57b5206..414f9f5aa0 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -14,10 +14,10 @@
import logging
-from synapse.storage.prepare_database import get_statements
-
import simplejson
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 85bd1a2006..ef7ec34346 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from synapse.config.appservice import load_appservices
from six.moves import range
+from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py
index fe6b7d196d..7d8ca5f93f 100644
--- a/synapse/storage/schema/delta/31/search_update.py
+++ b/synapse/storage/schema/delta/31/search_update.py
@@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.prepare_database import get_statements
-
import logging
+
import simplejson
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py
index 1e002f9db2..bff1256a7b 100644
--- a/synapse/storage/schema/delta/33/event_fields.py
+++ b/synapse/storage/schema/delta/33/event_fields.py
@@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-
import logging
+
import simplejson
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 55ae43f395..9754d3ccfb 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -14,7 +14,6 @@
import time
-
ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
index 3b63a1562d..cf09e43e2b 100644
--- a/synapse/storage/schema/delta/34/cache_stream.py
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py
index 033144341c..67d505e68b 100644
--- a/synapse/storage/schema/delta/34/received_txn_purge.py
+++ b/synapse/storage/schema/delta/34/received_txn_purge.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py
index 81948e3431..0ffab10b6f 100644
--- a/synapse/storage/schema/delta/34/sent_txn_purge.py
+++ b/synapse/storage/schema/delta/34/sent_txn_purge.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 20ad8bd5a6..a377884169 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.prepare_database import get_statements
-from synapse.storage.engines import PostgresEngine
-
import logging
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
logger = logging.getLogger(__name__)
DROP_INDICES = """
diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py
index ea6a18196d..506f326f4d 100644
--- a/synapse/storage/schema/delta/42/user_dir.py
+++ b/synapse/storage/schema/delta/42/user_dir.py
@@ -14,8 +14,8 @@
import logging
-from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql
new file mode 100644
index 0000000000..5d8641a9ab
--- /dev/null
+++ b/synapse/storage/schema/delta/50/erasure_store.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a table of users who have requested that their details be erased
+CREATE TABLE erased_users (
+ user_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..d5b5df93e6 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -13,19 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
import logging
import re
-import simplejson as json
+from collections import namedtuple
from six import string_types
+from canonicaljson import json
+
from twisted.internet import defer
-from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from .background_updates import BackgroundUpdateStore
+
logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 25922e5a9c..470212aa2a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
import six
-from ._base import SQLBaseStore
-
from unpaddedbase64 import encode_base64
+
+from twisted.internet import defer
+
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
+from ._base import SQLBaseStore
+
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index b452813fbb..c5ff44fef7 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
import logging
+from collections import namedtuple
from six import iteritems, itervalues
from six.moves import range
@@ -23,10 +23,11 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, get_cache_factor_for
+from synapse.util.caches import get_cache_factor_for, intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
+
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -585,19 +586,24 @@ class StateGroupWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None):
- """Given list of groups returns dict of group -> list of state events
- with matching types.
+ """Gets the state at each of a list of state groups, optionally
+ filtering by type/state_key
Args:
- groups(list[int]): list of groups whose state to query
- types(list[str|None, str|None]|None): List of 2-tuples of the form
- (`type`, `state_key`), where a `state_key` of `None` matches all
- state_keys for the `type`. Presence of type of `None` indicates
- that types not in the list should not be filtered out. If None,
- all events are returned.
+ groups (iterable[int]): list of state groups for which we want
+ to get the state.
+ types (None|iterable[(None|str, None|str)]):
+ indicates the state type/keys required. If None, the whole
+ state is fetched and returned.
+
+ Otherwise, each entry should be a `(type, state_key)` tuple to
+ include in the response. A `state_key` of None is a wildcard
+ meaning that we require all state with that type. A `type` of None
+ indicates that types not in the list should not be filtered out.
Returns:
- dict of group -> list of state events
+ Deferred[dict[int, dict[(type, state_key), EventBase]]]
+ a dictionary mapping from state group to state dictionary.
"""
if types:
types = frozenset(types)
@@ -606,7 +612,7 @@ class StateGroupWorkerStore(SQLBaseStore):
if types is not None:
for group in set(groups):
state_dict_ids, _, got_all = self._get_some_state_from_cache(
- group, types
+ group, types,
)
results[group] = state_dict_ids
@@ -627,26 +633,40 @@ class StateGroupWorkerStore(SQLBaseStore):
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
+ # the DictionaryCache knows if it has *all* the state, but
+ # does not know if it has all of the keys of a particular type,
+ # which makes wildcard lookups expensive unless we have a complete
+ # cache. Hence, if we are doing a wildcard lookup, populate the
+ # cache fully so that we can do an efficient lookup next time.
+
+ if types and any(k is None for (t, k) in types):
+ types_to_fetch = None
+ else:
+ types_to_fetch = types
+
group_to_state_dict = yield self._get_state_groups_from_groups(
- missing_groups, types
+ missing_groups, types_to_fetch,
)
- # Now we want to update the cache with all the things we fetched
- # from the database.
for group, group_state_dict in iteritems(group_to_state_dict):
state_dict = results[group]
- state_dict.update(
- ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
- for k, v in iteritems(group_state_dict)
- )
-
+ # update the result, filtering by `types`.
+ if types:
+ for k, v in iteritems(group_state_dict):
+ (typ, _) = k
+ if k in types or (typ, None) in types:
+ state_dict[k] = v
+ else:
+ state_dict.update(group_state_dict)
+
+ # update the cache with all the things we fetched from the
+ # database.
self._state_group_cache.update(
cache_seq_num,
key=group,
- value=state_dict,
- full=(types is None),
- known_absent=types,
+ value=group_state_dict,
+ fetched_keys=types_to_fetch,
)
defer.returnValue(results)
@@ -753,7 +773,6 @@ class StateGroupWorkerStore(SQLBaseStore):
self._state_group_cache.sequence,
key=state_group,
value=dict(current_state_ids),
- full=True,
)
return state_group
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index fb463c525a..66856342f0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -33,22 +33,20 @@ what sort order was used:
and stream ordering columns respectively.
"""
+import abc
+import logging
+from collections import namedtuple
+
+from six.moves import range
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
from synapse.storage.events import EventsWorkerStore
-
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.storage.engines import PostgresEngine
-
-import abc
-import logging
-
-from six.moves import range
-from collections import namedtuple
-
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 6671d3cfca..0f657b2bd3 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -14,16 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage.account_data import AccountDataWorkerStore
-
-from synapse.util.caches.descriptors import cached
-from twisted.internet import defer
-
-import simplejson as json
import logging
from six.moves import range
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage.account_data import AccountDataWorkerStore
+from synapse.util.caches.descriptors import cached
+
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e485d19b84..c3bc94f56d 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,18 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached
+import logging
+from collections import namedtuple
-from twisted.internet import defer
import six
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
-from collections import namedtuple
+from twisted.internet import defer
-import logging
-import simplejson as json
+from synapse.util.caches.descriptors import cached
+
+from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 275c299998..a8781b0e5d 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -13,19 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
+import re
-from ._base import SQLBaseStore
+from six import iteritems
+
+from twisted.internet import defer
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from six import iteritems
-
-import re
-import logging
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -265,7 +265,7 @@ class UserDirectoryStore(SQLBaseStore):
self.get_user_in_public_room.invalidate((user_id,))
def get_users_in_public_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory becuase they're
+ """Get all user_ids that are in the room directory because they're
in the given room_id
"""
return self._simple_select_onecol(
@@ -277,7 +277,7 @@ class UserDirectoryStore(SQLBaseStore):
@defer.inlineCallbacks
def get_users_in_dir_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory becuase they're
+ """Get all user_ids that are in the room directory because they're
in the given room_id
"""
user_ids_dir = yield self._simple_select_onecol(
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
new file mode 100644
index 0000000000..be013f4427
--- /dev/null
+++ b/synapse/storage/user_erasure_store.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import operator
+
+from twisted.internet import defer
+
+from synapse.storage._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached, cachedList
+
+
+class UserErasureWorkerStore(SQLBaseStore):
+ @cached()
+ def is_user_erased(self, user_id):
+ """
+ Check if the given user id has requested erasure
+
+ Args:
+ user_id (str): full user id to check
+
+ Returns:
+ Deferred[bool]: True if the user has requested erasure
+ """
+ return self._simple_select_onecol(
+ table="erased_users",
+ keyvalues={"user_id": user_id},
+ retcol="1",
+ desc="is_user_erased",
+ ).addCallback(operator.truth)
+
+ @cachedList(
+ cached_method_name="is_user_erased",
+ list_name="user_ids",
+ inlineCallbacks=True,
+ )
+ def are_users_erased(self, user_ids):
+ """
+ Checks which users in a list have requested erasure
+
+ Args:
+ user_ids (iterable[str]): full user id to check
+
+ Returns:
+ Deferred[dict[str, bool]]:
+ for each user, whether the user has requested erasure.
+ """
+ # this serves the dual purpose of (a) making sure we can do len and
+ # iterate it multiple times, and (b) avoiding duplicates.
+ user_ids = tuple(set(user_ids))
+
+ def _get_erased_users(txn):
+ txn.execute(
+ "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
+ ",".join("?" * len(user_ids))
+ ),
+ user_ids,
+ )
+ return set(r[0] for r in txn)
+
+ erased_users = yield self.runInteraction(
+ "are_users_erased", _get_erased_users,
+ )
+ res = dict((u, u in erased_users) for u in user_ids)
+ defer.returnValue(res)
+
+
+class UserErasureStore(UserErasureWorkerStore):
+ def mark_user_erased(self, user_id):
+ """Indicate that user_id wishes their message history to be erased.
+
+ Args:
+ user_id (str): full user_id to be erased
+ """
+ def f(txn):
+ # first check if they are already in the list
+ txn.execute(
+ "SELECT 1 FROM erased_users WHERE user_id = ?",
+ (user_id, )
+ )
+ if txn.fetchone():
+ return
+
+ # they are not already there: do the insert.
+ txn.execute(
+ "INSERT INTO erased_users (user_id) VALUES (?)",
+ (user_id, )
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.is_user_erased, (user_id,)
+ )
+ return self.runInteraction("mark_user_erased", f)
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 95031dc9ec..d6160d5e4d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import deque
import contextlib
import threading
+from collections import deque
class IdGenerator(object):
|