diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index dad3731b9b..bb14729a9d 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -46,11 +46,12 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.logging.opentracing import (
+from synapse.logging.tracing import (
+ Link,
SynapseTags,
- active_span,
- set_tag,
- start_active_span_follows_from,
+ get_active_span,
+ set_attribute,
+ start_active_span,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -124,7 +125,7 @@ times_pruned_extremities = Counter(
class _PersistEventsTask:
"""A batch of events to persist."""
- name: ClassVar[str] = "persist_event_batch" # used for opentracing
+ name: ClassVar[str] = "persist_event_batch" # used for tracing
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
@@ -145,7 +146,7 @@ class _PersistEventsTask:
class _UpdateCurrentStateTask:
"""A room whose current state needs recalculating."""
- name: ClassVar[str] = "update_current_state" # used for opentracing
+ name: ClassVar[str] = "update_current_state" # used for tracing
def try_merge(self, task: "_EventPersistQueueTask") -> bool:
"""Deduplicates consecutive recalculations of current state."""
@@ -160,11 +161,11 @@ class _EventPersistQueueItem:
task: _EventPersistQueueTask
deferred: ObservableDeferred
- parent_opentracing_span_contexts: List = attr.ib(factory=list)
- """A list of opentracing spans waiting for this batch"""
+ parent_tracing_span_contexts: List = attr.ib(factory=list)
+ """A list of tracing spans waiting for this batch"""
- opentracing_span_context: Any = None
- """The opentracing span under which the persistence actually happened"""
+ tracing_span_context: Any = None
+ """The tracing span under which the persistence actually happened"""
_PersistResult = TypeVar("_PersistResult")
@@ -228,10 +229,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
)
queue.append(end_item)
- # also add our active opentracing span to the item so that we get a link back
- span = active_span()
+ # also add our active tracing span to the item so that we get a link back
+ span = get_active_span()
if span:
- end_item.parent_opentracing_span_contexts.append(span.context)
+ end_item.parent_tracing_span_contexts.append(span.get_span_context())
# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
@@ -239,9 +240,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())
- # add another opentracing span which links to the persist trace.
- with start_active_span_follows_from(
- f"{task.name}_complete", (end_item.opentracing_span_context,)
+ # add another tracing span which links to the persist trace.
+ with start_active_span(
+ f"{task.name}_complete",
+ links=[Link(end_item.tracing_span_context)],
):
pass
@@ -272,13 +274,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- with start_active_span_follows_from(
+ with start_active_span(
item.task.name,
- item.parent_opentracing_span_contexts,
- inherit_force_tracing=True,
- ) as scope:
- if scope:
- item.opentracing_span_context = scope.span.context
+ links=[
+ Link(span_context)
+ for span_context in item.parent_tracing_span_contexts
+ ],
+ ) as span:
+ if span:
+ item.tracing_span_context = span.get_span_context()
ret = await self._per_item_callback(room_id, item.task)
except Exception:
@@ -392,15 +396,15 @@ class EventsPersistenceStorageController:
partitioned.setdefault(event.room_id, []).append((event, ctx))
event_ids.append(event.event_id)
- set_tag(
+ set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str(event_ids),
)
- set_tag(
+ set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)
- set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
+ set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index f9ffd0e29e..5964835ea3 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -29,7 +29,7 @@ from typing import (
from synapse.api.constants import EventTypes
from synapse.events import EventBase
-from synapse.logging.opentracing import tag_args, trace
+from synapse.logging.tracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..ca0f606797 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
-from synapse.logging import opentracing
+from synapse.logging import tracing
from synapse.logging.context import (
LoggingContext,
current_context,
@@ -422,11 +422,11 @@ class LoggingTransaction:
start = time.time()
try:
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"db.query",
- tags={
- opentracing.tags.DATABASE_TYPE: "sql",
- opentracing.tags.DATABASE_STATEMENT: one_line_sql,
+ attributes={
+ tracing.SpanAttributes.DB_SYSTEM: "sql",
+ tracing.SpanAttributes.DB_STATEMENT: one_line_sql,
},
):
return func(sql, *args, **kwargs)
@@ -701,15 +701,15 @@ class DatabasePool:
exception_callbacks=exception_callbacks,
)
try:
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"db.txn",
- tags={
- opentracing.SynapseTags.DB_TXN_DESC: desc,
- opentracing.SynapseTags.DB_TXN_ID: name,
+ attributes={
+ tracing.SynapseTags.DB_TXN_DESC: desc,
+ tracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
- opentracing.log_kv({"message": "commit"})
+ tracing.log_kv({"message": "commit"})
conn.commit()
return r
except self.engine.module.OperationalError as e:
@@ -725,7 +725,7 @@ class DatabasePool:
if i < N:
i += 1
try:
- with opentracing.start_active_span("db.rollback"):
+ with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1)
@@ -739,7 +739,7 @@ class DatabasePool:
if i < N:
i += 1
try:
- with opentracing.start_active_span("db.rollback"):
+ with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning(
@@ -845,7 +845,7 @@ class DatabasePool:
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
- with opentracing.start_active_span(f"db.{desc}"):
+ with tracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
@@ -928,9 +928,7 @@ class DatabasePool:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
- with opentracing.start_active_span(
- operation_name="db.connection",
- ):
+ with tracing.start_active_span("db.connection"):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
@@ -944,15 +942,13 @@ class DatabasePool:
"Reconnecting database connection over transaction limit"
)
conn.reconnect()
- opentracing.log_kv(
- {"message": "reconnected due to txn limit"}
- )
+ tracing.log_kv({"message": "reconnected due to txn limit"})
self._txn_counters[tid] = 1
if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
- opentracing.log_kv({"message": "reconnected"})
+ tracing.log_kv({"message": "reconnected"})
if self._txn_limit > 0:
self._txn_counters[tid] = 1
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..1503d74b1f 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -27,7 +27,7 @@ from typing import (
)
from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -436,7 +436,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(user_id, device_id), None
)
- set_tag("last_deleted_stream_id", str(last_deleted_stream_id))
+ set_attribute("last_deleted_stream_id", str(last_deleted_stream_id))
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -485,10 +485,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
A list of messages for the device and where in the stream the messages got to.
"""
- set_tag("destination", destination)
- set_tag("last_stream_id", last_stream_id)
- set_tag("current_stream_id", current_stream_id)
- set_tag("limit", limit)
+ set_attribute("destination", destination)
+ set_attribute("last_stream_id", last_stream_id)
+ set_attribute("current_stream_id", current_stream_id)
+ set_attribute("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..7ceb7a202b 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -30,11 +30,11 @@ from typing import (
from typing_extensions import Literal
-from synapse.api.constants import EduTypes
+from synapse.api.constants import EduTypes, EventContentFields
from synapse.api.errors import Codes, StoreError
-from synapse.logging.opentracing import (
+from synapse.logging.tracing import (
get_active_span_text_map,
- set_tag,
+ set_attribute,
trace,
whitelisted_homeserver,
)
@@ -333,12 +333,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
- # maps (user_id, device_id) -> (stream_id, opentracing_context)
+ # maps (user_id, device_id) -> (stream_id,tracing_context)
#
- # opentracing_context contains the opentracing metadata for the request
+ # tracing_context contains the opentelemetry metadata for the request
# that created the poke
#
- # The most recent request's opentracing_context is used as the
+ # The most recent request's tracing_context is used as the
# context which created the Edu.
# This is the stream ID that we will return for the consumer to resume
@@ -401,8 +401,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
if update_stream_id > previous_update_stream_id:
# FIXME If this overwrites an older update, this discards the
- # previous OpenTracing context.
- # It might make it harder to track down issues using OpenTracing.
+ # previous tracing context.
+ # It might make it harder to track down issues using tracing.
# If there's a good reason why it doesn't matter, a comment here
# about that would not hurt.
query_map[key] = (update_stream_id, update_context)
@@ -468,11 +468,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
- user_id
- device_id
- stream_id
- - opentracing_context
+ - tracing_context
"""
# get the list of device updates that need to be sent
sql = """
- SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
+ SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
@@ -493,7 +493,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
destination: The host the device updates are intended for
from_stream_id: The minimum stream_id to filter updates by, exclusive
query_map: Dictionary mapping (user_id, device_id) to
- (update stream_id, the relevant json-encoded opentracing context)
+ (update stream_id, the relevant json-encoded tracing context)
Returns:
List of objects representing a device update EDU.
@@ -531,13 +531,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
for device_id in device_ids:
device = user_devices[device_id]
- stream_id, opentracing_context = query_map[(user_id, device_id)]
+ stream_id, tracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
- "org.matrix.opentracing_context": opentracing_context,
+ EventContentFields.TRACING_CONTEXT: tracing_context,
}
prev_id = stream_id
@@ -706,8 +706,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
else:
results[user_id] = await self.get_cached_devices_for_user(user_id)
- set_tag("in_cache", str(results))
- set_tag("not_in_cache", str(user_ids_not_in_cache))
+ set_attribute("in_cache", str(results))
+ set_attribute("not_in_cache", str(user_ids_not_in_cache))
return user_ids_not_in_cache, results
@@ -1801,7 +1801,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"device_id",
"sent",
"ts",
- "opentracing_context",
+ "tracing_context",
),
values=[
(
@@ -1846,7 +1846,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"room_id",
"stream_id",
"converted_to_destinations",
- "opentracing_context",
+ "tracing_context",
),
values=[
(
@@ -1870,11 +1870,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
written to `device_lists_outbound_pokes`.
Returns:
- A list of user ID, device ID, room ID, stream ID and optional opentracing context.
+ A list of user ID, device ID, room ID, stream ID and optional opentelemetry context.
"""
sql = """
- SELECT user_id, device_id, room_id, stream_id, opentracing_context
+ SELECT user_id, device_id, room_id, stream_id, tracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
@@ -1892,9 +1892,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_id,
room_id,
stream_id,
- db_to_json(opentracing_context),
+ db_to_json(tracing_context),
)
- for user_id, device_id, room_id, stream_id, opentracing_context in txn
+ for user_id, device_id, room_id, stream_id, tracing_context in txn
]
return await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index af59be6b48..6d565102ac 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -18,7 +18,7 @@ from typing import Dict, Iterable, Mapping, Optional, Tuple, cast
from typing_extensions import Literal, TypedDict
from synapse.api.errors import StoreError
-from synapse.logging.opentracing import log_kv, trace
+from synapse.logging.tracing import log_kv, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict, JsonSerializable, StreamKeyType
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..2df8101390 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -36,7 +36,7 @@ from synapse.appservice import (
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
@@ -146,7 +146,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
key data. The key data will be a dict in the same format as the
DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
"""
- set_tag("query_list", str(query_list))
+ set_attribute("query_list", str(query_list))
if not query_list:
return {}
@@ -228,8 +228,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
Dict mapping from user-id to dict mapping from device_id to
key data.
"""
- set_tag("include_all_devices", include_all_devices)
- set_tag("include_deleted_devices", include_deleted_devices)
+ set_attribute("include_all_devices", include_all_devices)
+ set_attribute("include_deleted_devices", include_deleted_devices)
result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
@@ -416,9 +416,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"""
def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("new_keys", str(new_keys))
+ set_attribute("user_id", user_id)
+ set_attribute("device_id", device_id)
+ set_attribute("new_keys", str(new_keys))
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -1158,10 +1158,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool:
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("time_now", time_now)
- set_tag("device_keys", str(device_keys))
+ set_attribute("user_id", user_id)
+ set_attribute("device_id", device_id)
+ set_attribute("time_now", time_now)
+ set_attribute("device_keys", str(device_keys))
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c836078da6..41b015dba1 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -33,7 +33,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
-from synapse.logging.opentracing import tag_args, trace
+from synapse.logging.tracing import tag_args, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a4010ee28d..1c3b804da0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -40,7 +40,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
-from synapse.logging.opentracing import trace
+from synapse.logging.tracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 8a7cdb024d..90e6d82058 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,7 +54,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
-from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.logging.tracing import start_active_span, tag_args, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a347430aa7..f61f290547 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -58,7 +58,7 @@ from twisted.internet import defer
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import trace
+from synapse.logging.tracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index a9a88c8bfd..dd187f7422 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -75,6 +75,8 @@ Changes in SCHEMA_VERSION = 71:
Changes in SCHEMA_VERSION = 72:
- event_edges.(room_id, is_state) are no longer written to.
- Tables related to groups are dropped.
+ - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
+ from `opentracing_context` to generalized `tracing_context`.
"""
diff --git a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
new file mode 100644
index 0000000000..ae904863f8
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry
+ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context;
+ALTER TABLE device_lists_changes_in_room RENAME COLUMN opentracing_context TO tracing_context;
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index b4bf49dace..07af89ee31 100644
--- a/synapse/storage/util/partial_state_events_tracker.py
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging.tracing import trace_with_opname
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError
|