diff --git a/synapse/__init__.py b/synapse/__init__.py
index 0900492619..1bd03462ac 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
-__version__ = "1.37.1a1"
+__version__ = "1.37.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 29cf257633..f8b068e563 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import pymacaroons
from netaddr import IPAddress
@@ -31,6 +31,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.appservice import ApplicationService
from synapse.events import EventBase
+from synapse.events.builder import EventBuilder
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing as opentracing
@@ -490,7 +491,7 @@ class Auth:
def compute_auth_events(
self,
- event,
+ event: Union[EventBase, EventBuilder],
current_state_ids: StateMap[str],
for_verification: bool = False,
) -> List[str]:
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 414e4c019a..8363c2bb0f 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -201,6 +201,12 @@ class EventContentFields:
)
+class RoomTypes:
+ """Understood values of the room_type field of m.room.create events."""
+
+ SPACE = "m.space"
+
+
class RoomEncryptionAlgorithms:
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
DEFAULT = MEGOLM_V1_AES_SHA2
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 33d7c60241..89bcf81515 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional, Set, Tuple
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
@@ -29,6 +29,7 @@ from synapse.api.room_versions import (
RoomVersion,
)
from synapse.events import EventBase
+from synapse.events.builder import EventBuilder
from synapse.types import StateMap, UserID, get_domain_from_id
logger = logging.getLogger(__name__)
@@ -724,7 +725,7 @@ def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
return public_keys
-def auth_types_for_event(event: EventBase) -> Set[Tuple[str, str]]:
+def auth_types_for_event(event: Union[EventBase, EventBuilder]) -> Set[Tuple[str, str]]:
"""Given an event, return a list of (EventType, StateKey) that may be
needed to auth the event. The returned list may be a superset of what
would actually be required depending on the full state of the room.
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 0cb9c1cc1e..6286ad999a 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -118,7 +118,7 @@ class _EventInternalMetadata:
proactively_send = DictProperty("proactively_send") # type: bool
redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str
- token_id = DictProperty("token_id") # type: str
+ token_id = DictProperty("token_id") # type: int
historical = DictProperty("historical") # type: bool
# XXX: These are set by StreamWorkerStore._set_before_and_after.
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 81bf8615b7..fb48ec8541 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -12,12 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import attr
from nacl.signing import SigningKey
-from synapse.api.auth import Auth
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import UnsupportedRoomVersionError
from synapse.api.room_versions import (
@@ -34,10 +33,14 @@ from synapse.types import EventID, JsonDict
from synapse.util import Clock
from synapse.util.stringutils import random_string
+if TYPE_CHECKING:
+ from synapse.api.auth import Auth
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
-@attr.s(slots=True, cmp=False, frozen=True)
+@attr.s(slots=True, cmp=False, frozen=True, auto_attribs=True)
class EventBuilder:
"""A format independent event builder used to build up the event content
before signing the event.
@@ -62,31 +65,30 @@ class EventBuilder:
_signing_key: The signing key to use to sign the event as the server
"""
- _state = attr.ib(type=StateHandler)
- _auth = attr.ib(type=Auth)
- _store = attr.ib(type=DataStore)
- _clock = attr.ib(type=Clock)
- _hostname = attr.ib(type=str)
- _signing_key = attr.ib(type=SigningKey)
+ _state: StateHandler
+ _auth: "Auth"
+ _store: DataStore
+ _clock: Clock
+ _hostname: str
+ _signing_key: SigningKey
- room_version = attr.ib(type=RoomVersion)
+ room_version: RoomVersion
- room_id = attr.ib(type=str)
- type = attr.ib(type=str)
- sender = attr.ib(type=str)
+ room_id: str
+ type: str
+ sender: str
- content = attr.ib(default=attr.Factory(dict), type=JsonDict)
- unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict)
+ content: JsonDict = attr.Factory(dict)
+ unsigned: JsonDict = attr.Factory(dict)
# These only exist on a subset of events, so they raise AttributeError if
# someone tries to get them when they don't exist.
- _state_key = attr.ib(default=None, type=Optional[str])
- _redacts = attr.ib(default=None, type=Optional[str])
- _origin_server_ts = attr.ib(default=None, type=Optional[int])
+ _state_key: Optional[str] = None
+ _redacts: Optional[str] = None
+ _origin_server_ts: Optional[int] = None
- internal_metadata = attr.ib(
- default=attr.Factory(lambda: _EventInternalMetadata({})),
- type=_EventInternalMetadata,
+ internal_metadata: _EventInternalMetadata = attr.Factory(
+ lambda: _EventInternalMetadata({})
)
@property
@@ -184,7 +186,7 @@ class EventBuilder:
class EventBuilderFactory:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.hostname = hs.hostname
self.signing_key = hs.signing_key
@@ -193,15 +195,14 @@ class EventBuilderFactory:
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
- def new(self, room_version, key_values):
+ def new(self, room_version: str, key_values: dict) -> EventBuilder:
"""Generate an event builder appropriate for the given room version
Deprecated: use for_room_version with a RoomVersion object instead
Args:
- room_version (str): Version of the room that we're creating an event builder
- for
- key_values (dict): Fields used as the basis of the new event
+ room_version: Version of the room that we're creating an event builder for
+ key_values: Fields used as the basis of the new event
Returns:
EventBuilder
@@ -212,13 +213,15 @@ class EventBuilderFactory:
raise UnsupportedRoomVersionError()
return self.for_room_version(v, key_values)
- def for_room_version(self, room_version, key_values):
+ def for_room_version(
+ self, room_version: RoomVersion, key_values: dict
+ ) -> EventBuilder:
"""Generate an event builder appropriate for the given room version
Args:
- room_version (synapse.api.room_versions.RoomVersion):
+ room_version:
Version of the room that we're creating an event builder for
- key_values (dict): Fields used as the basis of the new event
+ key_values: Fields used as the basis of the new event
Returns:
EventBuilder
@@ -286,15 +289,15 @@ def create_local_event_from_event_dict(
_event_id_counter = 0
-def _create_event_id(clock, hostname):
+def _create_event_id(clock: Clock, hostname: str) -> str:
"""Create a new event ID
Args:
- clock (Clock)
- hostname (str): The server name for the event ID
+ clock
+ hostname: The server name for the event ID
Returns:
- str
+ The new event ID
"""
global _event_id_counter
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase):
async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
- with pdu_process_time.time():
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- return {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- return {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
- )
- return {"error": str(e)}
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ return {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ return {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
+ )
+ return {"error": str(e)}
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ class FederationServer(FederationBase):
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
- await self.store.remove_received_event_from_staging(
+ received_ts = await self.store.remove_received_event_from_staging(
origin, event.event_id
)
+ if received_ts is not None:
+ pdu_process_time.observe(
+ (self._clock.time_msec() - received_ts) / 1000
+ )
# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2c1b10f652..2a7a6e6982 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -509,6 +509,8 @@ class EventCreationHandler:
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ If non-None, prev_event_ids must also be provided.
+
require_consent: Whether to check if the requester has
consented to the privacy policy.
@@ -581,6 +583,9 @@ class EventCreationHandler:
# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
if auth_event_ids is not None:
+ # If auth events are provided, prev events must be also.
+ assert prev_event_ids is not None
+
temp_event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
@@ -784,6 +789,8 @@ class EventCreationHandler:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+
+ If non-None, prev_event_ids must also be provided.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 17fc47ce16..266f369883 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -25,6 +25,7 @@ from synapse.api.constants import (
EventTypes,
HistoryVisibility,
Membership,
+ RoomTypes,
)
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
@@ -318,7 +319,8 @@ class SpaceSummaryHandler:
Returns:
A tuple of:
- An iterable of a single value of the room.
+ The room information, if the room should be returned to the
+ user. None, otherwise.
An iterable of the sorted children events. This may be limited
to a maximum size or may include all children.
@@ -328,7 +330,11 @@ class SpaceSummaryHandler:
room_entry = await self._build_room_entry(room_id)
- # look for child rooms/spaces.
+ # If the room is not a space, return just the room information.
+ if room_entry.get("room_type") != RoomTypes.SPACE:
+ return room_entry, ()
+
+ # Otherwise, look for child rooms/spaces.
child_events = await self._get_child_events(room_id)
if suggested_only:
@@ -348,6 +354,7 @@ class SpaceSummaryHandler:
event_format=format_event_for_client_v2,
)
)
+
return room_entry, events_result
async def _summarize_remote_room(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self,
origin: str,
event_id: str,
- ) -> None:
- """Remove the given event from the staging area"""
- await self.db_pool.simple_delete(
- table="federation_inbound_events_staging",
- keyvalues={
- "origin": origin,
- "event_id": event_id,
- },
- desc="remove_received_event_from_staging",
- )
+ ) -> Optional[int]:
+ """Remove the given event from the staging area.
+
+ Returns:
+ The received_ts of the row that was deleted, if any.
+ """
+ if self.db_pool.engine.supports_returning:
+
+ def _remove_received_event_from_staging_txn(txn):
+ sql = """
+ DELETE FROM federation_inbound_events_staging
+ WHERE origin = ? AND event_id = ?
+ RETURNING received_ts
+ """
+
+ txn.execute(sql, (origin, event_id))
+ return txn.fetchone()
+
+ row = await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ db_autocommit=True,
+ )
+ if row is None:
+ return None
+
+ return row[0]
+
+ else:
+
+ def _remove_received_event_from_staging_txn(txn):
+ received_ts = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ allow_none=True,
+ )
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ )
+
+ return received_ts
+
+ return await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ )
async def get_next_staged_event_id_for_room(
self,
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..1c95c66648 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict
logger = logging.getLogger(__name__)
-_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
# there should be no leftover rows without a stream_ordering2, but just in case...
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
- # finally, we can drop the rule and switch the columns
+ # now we can drop the rule and switch the columns
"DROP RULE populate_stream_ordering2 ON events",
"ALTER TABLE events DROP COLUMN stream_ordering",
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+ # ... and finally, rename the indexes into place for consistency with sqlite
+ "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
+ "ALTER INDEX events_order_room2 RENAME TO events_order_room",
+ "ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
+ "ALTER INDEX events_ts2 RENAME TO events_ts",
)
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+ INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+ INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+ INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+ INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
@@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._purged_chain_cover_index,
)
+ ################################################################################
+
# bg updates for replacing stream_ordering with a BIGINT
# (these only run on postgres.)
+
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
self._background_populate_stream_ordering2,
)
+ # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
index_name="events_stream_ordering",
@@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
columns=["stream_ordering2"],
unique=True,
)
+ # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
+ index_name="event_contains_url_index2",
+ table="events",
+ columns=["room_id", "topological_ordering", "stream_ordering2"],
+ where_clause="contains_url = true AND outlier = false",
+ )
+ # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
+ index_name="events_order_room2",
+ table="events",
+ columns=["room_id", "topological_ordering", "stream_ordering2"],
+ )
+ # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
+ index_name="events_room_stream2",
+ table="events",
+ columns=["room_id", "stream_ordering2"],
+ )
+ # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
+ index_name="events_ts2",
+ table="events",
+ columns=["origin_server_ts", "stream_ordering2"],
+ )
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
self._background_replace_stream_ordering_column,
)
+ ################################################################################
+
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
def process(txn: Cursor) -> None:
- for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+ for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
logger.info("completing stream_ordering migration: %s", sql)
txn.execute(sql)
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
"""
...
+ @property
+ @abc.abstractmethod
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ ...
+
@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
"""Do we support using `a = ANY(?)` and passing a list"""
return True
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return True
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
"""Do we support using `a = ANY(?)` and passing a list"""
return False
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return self.module.sqlite_version_info >= (3, 35, 0)
+
def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
index 88c9f8bd0d..b5fb763ddd 100644
--- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
+++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
@@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6001, 'populate_stream_ordering2', '{}');
--- ... and another to build an index on it
+-- ... and some more to build indexes on it. These aren't really interdependent
+-- but the backround_updates manager can only handle a single dependency per update.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
- (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
+ (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'),
+ (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'),
+ (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'),
+ (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'),
+ (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream');
-- ... and another to do the switcheroo
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
- (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');
+ (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts');
diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
new file mode 100644
index 0000000000..630c24fd9e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'.
+--
+-- It updates the other tables which use an INTEGER to refer to a stream ordering.
+-- These tables are all small enough that a re-create is tractable.
+ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT;
+ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT;
+
+-- these aren't actually event stream orderings, but they are numbers where 2 billion
+-- is a bit limiting, application_services_state is tiny, and I don't want to ever have
+-- to do this again.
+ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT;
+
+
|