-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Expand file tree
/
Copy pathwrite_api.py
More file actions
95 lines (84 loc) · 3.44 KB
/
write_api.py
File metadata and controls
95 lines (84 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
API for kicking off new migrations
"""
from __future__ import annotations
from celery.result import AsyncResult
from opaque_keys.edx.locator import LibraryLocatorV2
from openedx_content.api import get_collection
from openedx_content.models_api import LearningPackage
from openedx.core.djangoapps.content_libraries.api import get_library
from openedx.core.types.user import AuthUser
from .. import models, tasks
from ..data import CompositionLevel, RepeatHandlingStrategy, SourceContextKey
__all__ = (
'start_migration_to_library',
'start_bulk_migration_to_library'
)
def start_migration_to_library(
*,
user: AuthUser,
source_key: SourceContextKey,
target_library_key: LibraryLocatorV2,
target_collection_slug: str | None = None,
create_collection: bool = False,
composition_level: CompositionLevel,
repeat_handling_strategy: RepeatHandlingStrategy,
preserve_url_slugs: bool,
forward_source_to_target: bool | None
) -> AsyncResult:
"""
Import a course or legacy library into a V2 library (or, a collection within a V2 library).
"""
return start_bulk_migration_to_library(
user=user,
source_key_list=[source_key],
target_library_key=target_library_key,
target_collection_slug_list=[target_collection_slug],
create_collections=create_collection,
composition_level=composition_level,
repeat_handling_strategy=repeat_handling_strategy,
preserve_url_slugs=preserve_url_slugs,
forward_source_to_target=forward_source_to_target,
)
def start_bulk_migration_to_library(
*,
user: AuthUser,
source_key_list: list[SourceContextKey],
target_library_key: LibraryLocatorV2,
target_collection_slug_list: list[str | None] | None = None,
create_collections: bool = False,
composition_level: CompositionLevel,
repeat_handling_strategy: RepeatHandlingStrategy,
preserve_url_slugs: bool,
forward_source_to_target: bool | None,
) -> AsyncResult:
"""
Import a list of courses or legacy libraries into a V2 library (or, a collections within a V2 library).
"""
target_library = get_library(target_library_key)
# get_library ensures that the library is connected to a learning package.
assert target_library.learning_package_id is not None
target_package_id: LearningPackage.ID = target_library.learning_package_id
sources_pks: list[int] = []
for source_key in source_key_list:
source, _ = models.ModulestoreSource.objects.get_or_create(key=str(source_key))
sources_pks.append(source.id)
target_collection_pks: list[int | None] = []
if target_collection_slug_list:
for target_collection_slug in target_collection_slug_list:
if target_collection_slug:
target_collection_id = get_collection(target_package_id, target_collection_slug).id
target_collection_pks.append(target_collection_id)
else:
target_collection_pks.append(None)
return tasks.bulk_migrate_from_modulestore.delay(
user_id=user.id,
sources_pks=sources_pks,
target_library_key=str(target_library_key),
target_collection_pks=target_collection_pks,
create_collections=create_collections,
composition_level=composition_level.value,
repeat_handling_strategy=repeat_handling_strategy.value,
preserve_url_slugs=preserve_url_slugs,
forward_source_to_target=forward_source_to_target,
)