diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a3c52695e9..0b9007e51f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -36,7 +36,7 @@ from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
__all__ = ["Databases", "DataStore"]
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a25c4093bc..240905329f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -27,7 +27,7 @@ from synapse.types import Collection, StreamToken, get_domain_from_id
from synapse.util import json_decoder
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 329660cf0f..ccb06aab39 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -23,7 +23,7 @@ from synapse.util import json_encoder
from . import engines
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 03a38422a1..85bb853d33 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -32,7 +32,7 @@ from synapse.types import JsonDict
from synapse.util import json_encoder
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 45ca6620a8..691080ce74 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import List, Tuple
+from typing import List, Optional, Tuple
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
@@ -115,7 +115,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
async def get_new_messages_for_device(
self,
user_id: str,
- device_id: str,
+ device_id: Optional[str],
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
@@ -163,7 +163,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: str, up_to_stream_id: int
+ self, user_id: str, device_id: Optional[str], up_to_stream_id: int
) -> int:
"""
Args:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 332193ad1c..a956be491a 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -793,7 +793,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return int(min_depth) if min_depth is not None else None
- async def get_forward_extremeties_for_room(
+ async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
) -> List[str]:
"""For a given room_id and stream_ordering, return the forward
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index cd1ceac50e..98dac19a95 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1270,8 +1270,10 @@ class PersistEventsStore:
logger.exception("")
raise
+ # update the stored internal_metadata to update the "outlier" flag.
+ # TODO: This is unused as of Synapse 1.31. Remove it once we are happy
+ # to drop backwards-compatibility with 1.30.
metadata_json = json_encoder.encode(event.internal_metadata.get_dict())
-
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
@@ -1319,6 +1321,19 @@ class PersistEventsStore:
d.pop("redacted_because", None)
return d
+ def get_internal_metadata(event):
+ im = event.internal_metadata.get_dict()
+
+ # temporary hack for database compatibility with Synapse 1.30 and earlier:
+ # store the `outlier` flag inside the internal_metadata json as well as in
+ # the `events` table, so that if anyone rolls back to an older Synapse,
+ # things keep working. This can be removed once we are happy to drop support
+ # for that
+ if event.internal_metadata.is_outlier():
+ im["outlier"] = True
+
+ return im
+
self.db_pool.simple_insert_many_txn(
txn,
table="event_json",
@@ -1327,7 +1342,7 @@ class PersistEventsStore:
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": json_encoder.encode(
- event.internal_metadata.get_dict()
+ get_internal_metadata(event)
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c04e162ccc..952d4969b2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -799,6 +799,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
+ original_ev.internal_metadata.outlier = row["outlier"]
event_map[event_id] = original_ev
@@ -905,7 +906,8 @@ class EventsWorkerStore(SQLBaseStore):
ej.json,
ej.format_version,
r.room_version,
- rej.reason
+ rej.reason,
+ e.outlier
FROM events AS e
JOIN event_json AS ej USING (event_id)
LEFT JOIN rooms r ON r.room_id = e.room_id
@@ -929,6 +931,7 @@ class EventsWorkerStore(SQLBaseStore):
"room_version_id": row[5],
"rejected_reason": row[6],
"redactions": [],
+ "outlier": row[7],
}
# check for redactions
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index d788dc0fc6..757da3d55d 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Dict, List
+from typing import Dict, List, Optional
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -109,7 +109,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
return users
@cached(num_args=1)
- async def user_last_seen_monthly_active(self, user_id: str) -> int:
+ async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]:
"""
Checks if a given user is part of the monthly active user group
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 85f1ebac98..c65558c280 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -27,7 +27,7 @@ from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index eba66ff352..90a8f664ef 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1210,6 +1210,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.get_user_deactivated_status, (user_id,)
)
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@cached()
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
@@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
stream_ordering: the stream_ordering of the event
"""
- return await self.db_pool.runInteraction(
- "store_destination_rooms_entries",
- self._store_destination_rooms_entries_txn,
- destinations,
- room_id,
- stream_ordering,
+ await self.db_pool.simple_upsert_many(
+ table="destinations",
+ key_names=("destination",),
+ key_values=[(d,) for d in destinations],
+ value_names=[],
+ value_values=[],
+ desc="store_destination_rooms_entries_dests",
)
- def _store_destination_rooms_entries_txn(
- self,
- txn: LoggingTransaction,
- destinations: Iterable[str],
- room_id: str,
- stream_ordering: int,
- ) -> None:
-
- # ensure we have a `destinations` row for this destination, as there is
- # a foreign key constraint.
- if isinstance(self.database_engine, PostgresEngine):
- q = """
- INSERT INTO destinations (destination)
- VALUES (?)
- ON CONFLICT DO NOTHING;
- """
- elif isinstance(self.database_engine, Sqlite3Engine):
- q = """
- INSERT OR IGNORE INTO destinations (destination)
- VALUES (?);
- """
- else:
- raise RuntimeError("Unknown database engine")
-
- txn.execute_batch(q, ((destination,) for destination in destinations))
-
rows = [(destination, room_id) for destination in destinations]
-
- self.db_pool.simple_upsert_many_txn(
- txn,
+ await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
+ desc="store_destination_rooms_entries_rooms",
)
async def get_destination_last_successful_stream_ordering(
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
index 4dcd848c59..ad954990a7 100644
--- a/synapse/storage/purge_events.py
+++ b/synapse/storage/purge_events.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Set
from synapse.storage.databases import Databases
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d179a41884..2e277a21c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -32,7 +32,7 @@ from synapse.events import EventBase
from synapse.types import MutableStateMap, StateMap
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
from synapse.storage.databases import Databases
logger = logging.getLogger(__name__)
@@ -449,7 +449,7 @@ class StateGroupStorage:
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
async def get_state_for_events(
- self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
+ self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
) -> Dict[str, StateMap[EventBase]]:
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event.
@@ -485,7 +485,7 @@ class StateGroupStorage:
return {event: event_to_state[event] for event in event_ids}
async def get_state_ids_for_events(
- self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
+ self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
|