From 3cf7d6d5b6bc938414533d5afe31a67e97c3b7d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 13:20:43 +0000 Subject: Move media admin store functions to worker store --- synapse/storage/data_stores/main/room.py | 240 +++++++++++++++---------------- 1 file changed, 120 insertions(+), 120 deletions(-) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 79cfd39194..652518049a 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -366,6 +366,126 @@ class RoomWorkerStore(SQLBaseStore): defer.returnValue(row) + def get_media_mxcs_in_room(self, room_id): + """Retrieves all the local and remote media MXC URIs in a given room + + Args: + room_id (str) + + Returns: + The local and remote media as a lists of tuples where the key is + the hostname and the value is the media ID. + """ + + def _get_media_mxcs_in_room_txn(txn): + local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id) + local_media_mxcs = [] + remote_media_mxcs = [] + + # Convert the IDs to MXC URIs + for media_id in local_mxcs: + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) + for hostname, media_id in remote_mxcs: + remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) + + return local_media_mxcs, remote_media_mxcs + + return self.db.runInteraction( + "get_media_ids_in_room", _get_media_mxcs_in_room_txn + ) + + def quarantine_media_ids_in_room(self, room_id, quarantined_by): + """For a room loops through all events with media and quarantines + the associated media + """ + + def _quarantine_media_in_room_txn(txn): + local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id) + total_media_quarantined = 0 + + # Now update all the tables to set the quarantined_by flag + + txn.executemany( + """ + UPDATE local_media_repository + SET quarantined_by = ? + WHERE media_id = ? + """, + ((quarantined_by, media_id) for media_id in local_mxcs), + ) + + txn.executemany( + """ + UPDATE remote_media_cache + SET quarantined_by = ? + WHERE media_origin = ? AND media_id = ? + """, + ( + (quarantined_by, origin, media_id) + for origin, media_id in remote_mxcs + ), + ) + + total_media_quarantined += len(local_mxcs) + total_media_quarantined += len(remote_mxcs) + + return total_media_quarantined + + return self.db.runInteraction( + "quarantine_media_in_room", _quarantine_media_in_room_txn + ) + + def _get_media_mxcs_in_room_txn(self, txn, room_id): + """Retrieves all the local and remote media MXC URIs in a given room + + Args: + txn (cursor) + room_id (str) + + Returns: + The local and remote media as a lists of tuples where the key is + the hostname and the value is the media ID. + """ + mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") + + next_token = self.get_current_events_token() + 1 + local_media_mxcs = [] + remote_media_mxcs = [] + + while next_token: + sql = """ + SELECT stream_ordering, json FROM events + JOIN event_json USING (room_id, event_id) + WHERE room_id = ? + AND stream_ordering < ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, next_token, True, False, 100)) + + next_token = None + for stream_ordering, content_json in txn: + next_token = stream_ordering + event_json = json.loads(content_json) + content = event_json["content"] + content_url = content.get("url") + thumbnail_url = content.get("info", {}).get("thumbnail_url") + + for url in (content_url, thumbnail_url): + if not url: + continue + matches = mxc_re.match(url) + if matches: + hostname = matches.group(1) + media_id = matches.group(2) + if hostname == self.hs.hostname: + local_media_mxcs.append(media_id) + else: + remote_media_mxcs.append((hostname, media_id)) + + return local_media_mxcs, remote_media_mxcs + class RoomBackgroundUpdateStore(SQLBaseStore): REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -810,126 +930,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): (room_id,), ) - def get_media_mxcs_in_room(self, room_id): - """Retrieves all the local and remote media MXC URIs in a given room - - Args: - room_id (str) - - Returns: - The local and remote media as a lists of tuples where the key is - the hostname and the value is the media ID. - """ - - def _get_media_mxcs_in_room_txn(txn): - local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id) - local_media_mxcs = [] - remote_media_mxcs = [] - - # Convert the IDs to MXC URIs - for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) - for hostname, media_id in remote_mxcs: - remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) - - return local_media_mxcs, remote_media_mxcs - - return self.db.runInteraction( - "get_media_ids_in_room", _get_media_mxcs_in_room_txn - ) - - def quarantine_media_ids_in_room(self, room_id, quarantined_by): - """For a room loops through all events with media and quarantines - the associated media - """ - - def _quarantine_media_in_room_txn(txn): - local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id) - total_media_quarantined = 0 - - # Now update all the tables to set the quarantined_by flag - - txn.executemany( - """ - UPDATE local_media_repository - SET quarantined_by = ? - WHERE media_id = ? - """, - ((quarantined_by, media_id) for media_id in local_mxcs), - ) - - txn.executemany( - """ - UPDATE remote_media_cache - SET quarantined_by = ? - WHERE media_origin = ? AND media_id = ? - """, - ( - (quarantined_by, origin, media_id) - for origin, media_id in remote_mxcs - ), - ) - - total_media_quarantined += len(local_mxcs) - total_media_quarantined += len(remote_mxcs) - - return total_media_quarantined - - return self.db.runInteraction( - "quarantine_media_in_room", _quarantine_media_in_room_txn - ) - - def _get_media_mxcs_in_room_txn(self, txn, room_id): - """Retrieves all the local and remote media MXC URIs in a given room - - Args: - txn (cursor) - room_id (str) - - Returns: - The local and remote media as a lists of tuples where the key is - the hostname and the value is the media ID. - """ - mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") - - next_token = self.get_current_events_token() + 1 - local_media_mxcs = [] - remote_media_mxcs = [] - - while next_token: - sql = """ - SELECT stream_ordering, json FROM events - JOIN event_json USING (room_id, event_id) - WHERE room_id = ? - AND stream_ordering < ? - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? - """ - txn.execute(sql, (room_id, next_token, True, False, 100)) - - next_token = None - for stream_ordering, content_json in txn: - next_token = stream_ordering - event_json = json.loads(content_json) - content = event_json["content"] - content_url = content.get("url") - thumbnail_url = content.get("info", {}).get("thumbnail_url") - - for url in (content_url, thumbnail_url): - if not url: - continue - matches = mxc_re.match(url) - if matches: - hostname = matches.group(1) - media_id = matches.group(2) - if hostname == self.hs.hostname: - local_media_mxcs.append(media_id) - else: - remote_media_mxcs.append((hostname, media_id)) - - return local_media_mxcs, remote_media_mxcs - @defer.inlineCallbacks def get_rooms_for_retention_period_in_range( self, min_ms, max_ms, include_null=False -- cgit 1.4.1 From 1adf27c82a6ead7f5001b95240c5dadbc0fd386d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 13:25:31 +0000 Subject: Import RoomStore in media worker to fix admin APIs --- synapse/app/media_repository.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index a63c53dc44..5b5832214a 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -34,6 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.admin import register_servlets_for_media_repo @@ -47,6 +48,7 @@ logger = logging.getLogger("synapse.app.media_repository") class MediaRepositorySlavedStore( + RoomStore, SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedClientIpStore, -- cgit 1.4.1 From 4e2a072a05c1f894687772e6e55a943f3d941fbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 13:28:19 +0000 Subject: Newsfile --- changelog.d/6664.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6664.bugfix diff --git a/changelog.d/6664.bugfix b/changelog.d/6664.bugfix new file mode 100644 index 0000000000..8c6a6fa1c8 --- /dev/null +++ b/changelog.d/6664.bugfix @@ -0,0 +1 @@ +Fix media repo admin APIs when using a media worker. -- cgit 1.4.1 From 187dc6ad0278c12f15cf2300659d5a1e10ef2446 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 14:23:27 +0000 Subject: Do not rely on streaming events, as media repo doesn't --- synapse/storage/data_stores/main/room.py | 38 ++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 652518049a..0509d9f64d 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -448,21 +448,32 @@ class RoomWorkerStore(SQLBaseStore): """ mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") - next_token = self.get_current_events_token() + 1 + next_token = None local_media_mxcs = [] remote_media_mxcs = [] - while next_token: - sql = """ - SELECT stream_ordering, json FROM events - JOIN event_json USING (room_id, event_id) - WHERE room_id = ? - AND stream_ordering < ? - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? - """ - txn.execute(sql, (room_id, next_token, True, False, 100)) + while True: + if next_token is None: + sql = """ + SELECT stream_ordering, json FROM events + JOIN event_json USING (room_id, event_id) + WHERE room_id = ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, True, False, 100)) + else: + sql = """ + SELECT stream_ordering, json FROM events + JOIN event_json USING (room_id, event_id) + WHERE room_id = ? + AND stream_ordering < ? + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql, (room_id, next_token, True, False, 100)) next_token = None for stream_ordering, content_json in txn: @@ -484,6 +495,9 @@ class RoomWorkerStore(SQLBaseStore): else: remote_media_mxcs.append((hostname, media_id)) + if next_token is None: + break + return local_media_mxcs, remote_media_mxcs -- cgit 1.4.1 From bca3455b3860b80199e3b750a99de6e13d636d82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 14:27:35 +0000 Subject: Comments --- synapse/storage/data_stores/main/room.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 0509d9f64d..11e93fd668 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -453,6 +453,8 @@ class RoomWorkerStore(SQLBaseStore): remote_media_mxcs = [] while True: + # The first time round we just want to get the most recent + # events, then we bound by stream ordering if next_token is None: sql = """ SELECT stream_ordering, json FROM events @@ -496,6 +498,7 @@ class RoomWorkerStore(SQLBaseStore): remote_media_mxcs.append((hostname, media_id)) if next_token is None: + # We've gone through the whole room, so we're finished. break return local_media_mxcs, remote_media_mxcs -- cgit 1.4.1 From d74054afda9cf7c63c9f74a9b55c02f9df321d6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2020 14:57:45 +0000 Subject: Shuffle the code --- synapse/storage/data_stores/main/room.py | 41 +++++++++++++------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 11e93fd668..8636d75030 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -448,35 +448,21 @@ class RoomWorkerStore(SQLBaseStore): """ mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)") - next_token = None + sql = """ + SELECT stream_ordering, json FROM events + JOIN event_json USING (room_id, event_id) + WHERE room_id = ? + %(where_clause)s + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100)) + local_media_mxcs = [] remote_media_mxcs = [] while True: - # The first time round we just want to get the most recent - # events, then we bound by stream ordering - if next_token is None: - sql = """ - SELECT stream_ordering, json FROM events - JOIN event_json USING (room_id, event_id) - WHERE room_id = ? - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? - """ - txn.execute(sql, (room_id, True, False, 100)) - else: - sql = """ - SELECT stream_ordering, json FROM events - JOIN event_json USING (room_id, event_id) - WHERE room_id = ? - AND stream_ordering < ? - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? - """ - txn.execute(sql, (room_id, next_token, True, False, 100)) - next_token = None for stream_ordering, content_json in txn: next_token = stream_ordering @@ -501,6 +487,11 @@ class RoomWorkerStore(SQLBaseStore): # We've gone through the whole room, so we're finished. break + txn.execute( + sql % {"where_clause": "AND stream_ordering < ?"}, + (room_id, next_token, True, False, 100), + ) + return local_media_mxcs, remote_media_mxcs -- cgit 1.4.1