Skip to content

Commit 4ced7e1

Browse files
michaelfu1029Michael Fu
andauthored
Add migrate-db command and support airflow downgrades (#247)
*Issue #, if available:* *Description of changes:* Added a new command `migrate-db` for a special container that will run the same airflow db init/upgrade logic but will also check for an environment variable `MWAA__CORE__TARGET_VERSION` to downgrade. Existing inplace upgrade will not be changed for now. The migrate_with_downgrade.py file is a copy or migrate.py but with an additional downgrade check By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Michael Fu <miklfu@amazon.com>
1 parent 08eb1ab commit 4ced7e1

9 files changed

Lines changed: 423 additions & 0 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
This script is responsible for running Airflow meta database migrations. This will replace
3+
the migrate script.
4+
5+
IMPORTANT NOTE: This script must be run with all the required environments exported,
6+
just like when running any Airflow command, as it imports Airflow modules and needs to
7+
connect to the meta database, thus all configurations need to be set.
8+
"""
9+
10+
from argparse import Namespace
11+
from packaging.version import Version
12+
import logging.config
13+
import os
14+
import sys
15+
16+
from mwaa.utils.dblock import with_db_lock
17+
from airflow.cli.commands import db_command as airflow_db_command
18+
19+
20+
# Usually, we pass the `__name__` variable instead as that defaults to the module path,
21+
# i.e. `mwaa.entrypoint` in this case. However, since this is a script, `__name__` will
22+
# have the value of `__main__`, hence we hard-code the module path.
23+
logger = logging.getLogger("mwaa.database.migrate_with_downgrade")
24+
25+
26+
def _verify_environ():
27+
"""
28+
This script is supposed to have all the environment variables required for running
29+
Airflow, since we will be using Airflow modules directly. This function verifies
30+
they are set by ensuring the existence of the `AWS_EXECUTION_ENV`, which we add
31+
during the creation of the `environ` dictionary in the entrypoint.py.
32+
"""
33+
if not os.environ.get("AWS_EXECUTION_ENV", "").startswith("Amazon_MWAA_"):
34+
logger.error("The necessary environment variables are not set.")
35+
sys.exit(1)
36+
37+
38+
@with_db_lock(1234)
39+
def _migrate_db():
40+
try:
41+
args = Namespace(migration_wait_timeout=1)
42+
airflow_db_command.check_migrations(args)
43+
logging.info("The database is migrated to the current version.")
44+
_check_downgrade_db()
45+
except TimeoutError:
46+
logging.info("The database is not yet migrated. Migrating...")
47+
args = Namespace(
48+
from_revision=None,
49+
from_version=None,
50+
reserialize_dags=True,
51+
show_sql_only=None,
52+
to_revision=None,
53+
to_version=None,
54+
use_migration_files=None,
55+
)
56+
airflow_db_command.migratedb(args)
57+
logging.info("The database is now migrated.")
58+
59+
def _check_downgrade_db():
60+
target_version = os.environ.get("MWAA__CORE__TARGET_VERSION", None)
61+
current_version = os.environ.get("AIRFLOW_VERSION", None)
62+
if target_version and current_version and Version(target_version) < Version(current_version):
63+
logging.info(f"Downgrading the database to {target_version}. Downgrading...")
64+
args = Namespace(
65+
from_revision=None,
66+
from_version=None,
67+
reserialize_dags=True,
68+
show_sql_only=None,
69+
to_revision=None,
70+
to_version=target_version,
71+
use_migration_files=None,
72+
yes=True,
73+
)
74+
airflow_db_command.downgrade(args)
75+
76+
77+
def _main():
78+
_verify_environ()
79+
_migrate_db()
80+
81+
82+
if __name__ == "__main__":
83+
_main()
84+
else:
85+
logger.error(
86+
"This module cannot be imported. It should be run directly using: python -m mwaa.database.migrate_with_downgrade"
87+
)
88+
sys.exit(1)

images/airflow/2.10.1/python/mwaa/entrypoint.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def _configure_root_logger(command: str):
7676
"webserver",
7777
"scheduler",
7878
"worker",
79+
"migrate-db",
7980
"hybrid",
8081
"shell",
8182
"resetdb",
@@ -103,6 +104,21 @@ async def airflow_db_init(environ: dict[str, str]):
103104
await run_command("python3 -m mwaa.database.migrate", env=environ)
104105

105106

107+
async def airflow_db_migrate(environ: dict[str, str]):
108+
"""
109+
Migrate/Initialize Airflow database.
110+
111+
Used in the migrate-db container and will handle both upgrades and downgrades
112+
113+
Before Airflow can be used, a call to `airflow db migrate` must be done. This
114+
function does this. This function is called in the entrypoint to make sure that,
115+
for any Airflow component, the database is initialized before it starts.
116+
117+
:param environ: A dictionary containing the environment variables.
118+
"""
119+
await run_command("python3 -m mwaa.database.migrate_with_downgrade", env=environ)
120+
121+
106122
@with_db_lock(5678)
107123
async def create_airflow_user(environ: dict[str, str]):
108124
"""
@@ -186,13 +202,20 @@ async def main() -> None:
186202
executor_type = os.environ.get("MWAA__CORE__EXECUTOR_TYPE", "CeleryExecutor")
187203
environ = setup_environment_variables(command, executor_type)
188204

205+
if command == "migrate-db":
206+
await airflow_db_migrate(environ)
207+
print("Finished running db validations")
208+
return
209+
189210
await install_user_requirements(command, environ)
190211

191212
if command == "test-requirements":
192213
print("Finished testing requirements")
193214
return
194215

216+
# Remove this when we only want the migrate container to update db
195217
await airflow_db_init(environ)
218+
196219
if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing":
197220
# In "simple" auth mode, we create an admin user "airflow" with password
198221
# "airflow". We use this to make the Docker Compose setup easy to use without
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
This script is responsible for running Airflow meta database migrations. This will replace
3+
the migrate script.
4+
5+
IMPORTANT NOTE: This script must be run with all the required environments exported,
6+
just like when running any Airflow command, as it imports Airflow modules and needs to
7+
connect to the meta database, thus all configurations need to be set.
8+
"""
9+
10+
from argparse import Namespace
11+
from packaging.version import Version
12+
import logging.config
13+
import os
14+
import sys
15+
16+
from mwaa.utils.dblock import with_db_lock
17+
from airflow.cli.commands import db_command as airflow_db_command
18+
19+
20+
# Usually, we pass the `__name__` variable instead as that defaults to the module path,
21+
# i.e. `mwaa.entrypoint` in this case. However, since this is a script, `__name__` will
22+
# have the value of `__main__`, hence we hard-code the module path.
23+
logger = logging.getLogger("mwaa.database.migrate_with_downgrade")
24+
25+
26+
def _verify_environ():
27+
"""
28+
This script is supposed to have all the environment variables required for running
29+
Airflow, since we will be using Airflow modules directly. This function verifies
30+
they are set by ensuring the existence of the `AWS_EXECUTION_ENV`, which we add
31+
during the creation of the `environ` dictionary in the entrypoint.py.
32+
"""
33+
if not os.environ.get("AWS_EXECUTION_ENV", "").startswith("Amazon_MWAA_"):
34+
logger.error("The necessary environment variables are not set.")
35+
sys.exit(1)
36+
37+
38+
@with_db_lock(1234)
39+
def _migrate_db():
40+
try:
41+
args = Namespace(migration_wait_timeout=1)
42+
airflow_db_command.check_migrations(args)
43+
logging.info("The database is migrated to the current version.")
44+
_check_downgrade_db()
45+
except TimeoutError:
46+
logging.info("The database is not yet migrated. Migrating...")
47+
args = Namespace(
48+
from_revision=None,
49+
from_version=None,
50+
reserialize_dags=True,
51+
show_sql_only=None,
52+
to_revision=None,
53+
to_version=None,
54+
use_migration_files=None,
55+
)
56+
airflow_db_command.migratedb(args)
57+
logging.info("The database is now migrated.")
58+
59+
def _check_downgrade_db():
60+
target_version = os.environ.get("MWAA__CORE__TARGET_VERSION", None)
61+
current_version = os.environ.get("AIRFLOW_VERSION", None)
62+
if target_version and current_version and Version(target_version) < Version(current_version):
63+
logging.info(f"Downgrading the database to {target_version}. Downgrading...")
64+
args = Namespace(
65+
from_revision=None,
66+
from_version=None,
67+
reserialize_dags=True,
68+
show_sql_only=None,
69+
to_revision=None,
70+
to_version=target_version,
71+
use_migration_files=None,
72+
yes=True,
73+
)
74+
airflow_db_command.downgrade(args)
75+
76+
77+
def _main():
78+
_verify_environ()
79+
_migrate_db()
80+
81+
82+
if __name__ == "__main__":
83+
_main()
84+
else:
85+
logger.error(
86+
"This module cannot be imported. It should be run directly using: python -m mwaa.database.migrate_with_downgrade"
87+
)
88+
sys.exit(1)

images/airflow/2.10.3/python/mwaa/entrypoint.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def _configure_root_logger(command: str):
7676
"webserver",
7777
"scheduler",
7878
"worker",
79+
"migrate-db",
7980
"hybrid",
8081
"shell",
8182
"resetdb",
@@ -103,6 +104,21 @@ async def airflow_db_init(environ: dict[str, str]):
103104
await run_command("python3 -m mwaa.database.migrate", env=environ)
104105

105106

107+
async def airflow_db_migrate(environ: dict[str, str]):
108+
"""
109+
Migrate/Initialize Airflow database.
110+
111+
Used in the migrate-db container and will handle both upgrades and downgrades
112+
113+
Before Airflow can be used, a call to `airflow db migrate` must be done. This
114+
function does this. This function is called in the entrypoint to make sure that,
115+
for any Airflow component, the database is initialized before it starts.
116+
117+
:param environ: A dictionary containing the environment variables.
118+
"""
119+
await run_command("python3 -m mwaa.database.migrate_with_downgrade", env=environ)
120+
121+
106122
@with_db_lock(5678)
107123
async def create_airflow_user(environ: dict[str, str]):
108124
"""
@@ -186,13 +202,20 @@ async def main() -> None:
186202
executor_type = os.environ.get("MWAA__CORE__EXECUTOR_TYPE", "CeleryExecutor")
187203
environ = setup_environment_variables(command, executor_type)
188204

205+
if command == "migrate-db":
206+
await airflow_db_migrate(environ)
207+
print("Finished running db validations")
208+
return
209+
189210
await install_user_requirements(command, environ)
190211

191212
if command == "test-requirements":
192213
print("Finished testing requirements")
193214
return
194215

216+
# Remove this when we only want the migrate container to update db
195217
await airflow_db_init(environ)
218+
196219
if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing":
197220
# In "simple" auth mode, we create an admin user "airflow" with password
198221
# "airflow". We use this to make the Docker Compose setup easy to use without
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
This script is responsible for running Airflow meta database migrations. This will replace
3+
the migrate script.
4+
5+
IMPORTANT NOTE: This script must be run with all the required environments exported,
6+
just like when running any Airflow command, as it imports Airflow modules and needs to
7+
connect to the meta database, thus all configurations need to be set.
8+
"""
9+
10+
from argparse import Namespace
11+
from packaging.version import Version
12+
import logging.config
13+
import os
14+
import sys
15+
16+
from mwaa.utils.dblock import with_db_lock
17+
from airflow.cli.commands import db_command as airflow_db_command
18+
19+
20+
# Usually, we pass the `__name__` variable instead as that defaults to the module path,
21+
# i.e. `mwaa.entrypoint` in this case. However, since this is a script, `__name__` will
22+
# have the value of `__main__`, hence we hard-code the module path.
23+
logger = logging.getLogger("mwaa.database.migrate_with_downgrade")
24+
25+
26+
def _verify_environ():
27+
"""
28+
This script is supposed to have all the environment variables required for running
29+
Airflow, since we will be using Airflow modules directly. This function verifies
30+
they are set by ensuring the existence of the `AWS_EXECUTION_ENV`, which we add
31+
during the creation of the `environ` dictionary in the entrypoint.py.
32+
"""
33+
if not os.environ.get("AWS_EXECUTION_ENV", "").startswith("Amazon_MWAA_"):
34+
logger.error("The necessary environment variables are not set.")
35+
sys.exit(1)
36+
37+
38+
@with_db_lock(1234)
39+
def _migrate_db():
40+
try:
41+
args = Namespace(migration_wait_timeout=1)
42+
airflow_db_command.check_migrations(args)
43+
logging.info("The database is migrated to the current version.")
44+
_check_downgrade_db()
45+
except TimeoutError:
46+
logging.info("The database is not yet migrated. Migrating...")
47+
args = Namespace(
48+
from_revision=None,
49+
from_version=None,
50+
reserialize_dags=True,
51+
show_sql_only=None,
52+
to_revision=None,
53+
to_version=None,
54+
use_migration_files=None,
55+
)
56+
airflow_db_command.migratedb(args)
57+
logging.info("The database is now migrated.")
58+
59+
def _check_downgrade_db():
60+
target_version = os.environ.get("MWAA__CORE__TARGET_VERSION", None)
61+
current_version = os.environ.get("AIRFLOW_VERSION", None)
62+
if target_version and current_version and Version(target_version) < Version(current_version):
63+
logging.info(f"Downgrading the database to {target_version}. Downgrading...")
64+
args = Namespace(
65+
from_revision=None,
66+
from_version=None,
67+
reserialize_dags=True,
68+
show_sql_only=None,
69+
to_revision=None,
70+
to_version=target_version,
71+
use_migration_files=None,
72+
yes=True,
73+
)
74+
airflow_db_command.downgrade(args)
75+
76+
77+
def _main():
78+
_verify_environ()
79+
_migrate_db()
80+
81+
82+
if __name__ == "__main__":
83+
_main()
84+
else:
85+
logger.error(
86+
"This module cannot be imported. It should be run directly using: python -m mwaa.database.migrate_with_downgrade"
87+
)
88+
sys.exit(1)

0 commit comments

Comments
 (0)