Skip to content

SDK Reference

vermillio.sdk.core

ExternalPipeline pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ExternalPipeline(BaseModel):
    id: str = Field(description="Unique id of the pipeline.")
    name: str = Field(description="The friendly name of the pipeline.")
    description: Optional[str] = Field(
        default=None, description="Description of what the purpose of this pipeline is."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this pipeline was created."
    )
    contexts: list[str] = Field(
        description="The list of contexts available in this pipeline."
    )
    default_context: str = Field(
        description="The default context used if none is provided in endpoints."
    )

contexts pydantic-field

contexts: list[str]

The list of contexts available in this pipeline.

created_at pydantic-field

created_at: float

The epoch time in seconds of when this pipeline was created.

default_context pydantic-field

default_context: str

The default context used if none is provided in endpoints.

description pydantic-field

description: Optional[str] = None

Description of what the purpose of this pipeline is.

id pydantic-field

id: str

Unique id of the pipeline.

name pydantic-field

name: str

The friendly name of the pipeline.

ExternalSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
22
23
24
25
26
27
28
29
30
31
32
33
34
class ExternalSource(BaseModel):
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )

asset_id pydantic-field

asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

media_id pydantic-field

media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field

source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

PipelineLoadRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
47
48
49
50
51
52
53
54
class PipelineLoadRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to load the sources against.",
    )
    sources: Union[list[ExternalSource], list[str]] = Field(
        description="The sources to load, either an external source or a path to a file (if the pipeline supports that).",
    )

context pydantic-field

context: Optional[str] = None

Override the default context, if available, to load the sources against.

sources pydantic-field

The sources to load, either an external source or a path to a file (if the pipeline supports that).

PipelineResults pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PipelineResults(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    source_path: Optional[str] = Field(
        default=None,
        description="The (optional) source path that was provided in the external source."
    )

    @property
    def is_finished(self) -> bool:
        return self.status in ["Succeeded", "Failed", "Deleted"]

created_at pydantic-field

created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field

id: str

Vermillio's unique id for this pipeline source.

source_id pydantic-field

source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

source_path pydantic-field

source_path: Optional[str] = None

The (optional) source path that was provided in the external source.

status pydantic-field

status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field

updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineRunRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
37
38
39
40
41
42
43
44
class PipelineRunRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to run the source against.",
    )
    source: Union[ExternalSource, str] = Field(
        description="The source to run, either an external source or a path to a file (if the pipeline supports that).",
    )

context pydantic-field

context: Optional[str] = None

Override the default context, if available, to run the source against.

source pydantic-field

The source to run, either an external source or a path to a file (if the pipeline supports that).

PipelineSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class PipelineSource(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )

asset_id pydantic-field

asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

created_at pydantic-field

created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field

id: str

Vermillio's unique id for this pipeline source.

media_id pydantic-field

media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field

source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

status pydantic-field

status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field

updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineStatus pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PipelineStatus(BaseModel):
    pending_count: int = Field(
        description="The number of sources under this context with status == 'Pending'"
    )
    running_count: int = Field(
        description="The number of sources under this context with status == 'Running'"
    )
    succeeded_count: int = Field(
        description="The number of sources under this context with status == 'Succeeded'"
    )
    errored_count: int = Field(
        description="The number of sources under this context with status == 'Errored'"
    )
    failed_count: int = Field(
        description="The number of sources under this context with status == 'Failed'"
    )

errored_count pydantic-field

errored_count: int

The number of sources under this context with status == 'Errored'

failed_count pydantic-field

failed_count: int

The number of sources under this context with status == 'Failed'

pending_count pydantic-field

pending_count: int

The number of sources under this context with status == 'Pending'

running_count pydantic-field

running_count: int

The number of sources under this context with status == 'Running'

succeeded_count pydantic-field

succeeded_count: int

The number of sources under this context with status == 'Succeeded'

VermillioAssets

Bases: VermillioBaseClient

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class VermillioAssets(VermillioBaseClient):
    def list(self) -> List[PartnerAsset]:
        res = self._get("/partner/asset/list", RootModel[List[PartnerAsset]])
        return res.root if res else []

    def get(self, id: str) -> Optional[PartnerAsset]:
        return self._get(f"/partner/asset/{id}", PartnerAsset)

    def upload(
        self,
        file_path: str,
        asset_type: AssetType,
        title: str,
        parent_asset_id: Optional[str] = None,
    ) -> PartnerAsset:
        assert os.path.isfile(file_path), f"Failed to find file: '{file_path}'"
        with open(file_path, "rb") as f:
            return self._post(
                "/partner/asset/upload",
                None,
                PartnerAsset,
                {
                    "type": asset_type,
                    "title": title,
                    **({"parentAssetId": parent_asset_id} if parent_asset_id else {}),
                },
                {(os.path.basename(file_path)): f},
            )

    def _get_filename(self, file_name: str):
        parsed_url = urlparse(file_name)

        return Path(parsed_url.path).name

    def _download_url(self, url: str, download_path: str):
        filename = self._get_filename(url)
        path = os.path.join(download_path, filename)

        self._download_file(url, path)

        return path

    def download(self, media_id: str, download_path: str = "") -> str:
        """
        downloads a specific media
        """
        result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
        if result:
            if result.local_path:
                download_path = os.path.join(download_path, result.local_path)
                os.makedirs(download_path, exist_ok=True)
            return self._download_url(result.url, download_path)

        return None

    def download_media_by_license(
        self, license_id: str, asset_id: str = None, download_path: str = ""
    ) -> str:
        """
        downloads all media for a license
        """
        result = self._get(
            f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
        )
        urls = []
        if result:
            for signed_url in result.urls:
                local_path = download_path
                if signed_url.local_path:
                    local_path = os.path.join(local_path, signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = self._download_url(signed_url.url, local_path)
                urls.append(path)

        return urls

    def prepare_zip_license(self, license_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
        )

    def status_zip(self, download_id: str):
        """
        get status of download
        """
        return self._get(
            f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
        )

    def download_zip(self, download_id, download_path: str = ""):
        """
        downloads all the media for a list of assets
        """
        result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
        if result:
            return self._download_url(result.url, download_path)

        return None

    def download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def prepare_license(self, license_id: str):
        """
        prepare for download of licensed data
        """
        return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)

    def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}/top/{top}",
            DownloadResults,
        )

    def remaining_downloads(self, download_id: str) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}", DownloadResults
        )

    def media_access(self, media_access_id: str) -> DownloadResult:
        """
        get media access
        """
        return self._get(
            f"/partner/asset/media/access/{media_access_id}", DownloadResult
        )

    def mark_downloaded(self, media_access_id: str) -> DownloadResult:
        """
        mark media_access as downloaded
        """
        return self._put(
            f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
            None,
            DownloadResult,
        )

    def download_status(self, download_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/download/{download_id}/status", MultiFileDownload
        )

    def bulk_download_license(self, license_id: str, download_path: str = ""):
        """
        bulk download license
        """
        request = self.prepare_license(license_id)

        files = self.remaining_downloads(request.id)
        while len(files.results) > 0:
            for file in files.results:
                filename = self._get_filename(file.signed_url.url)
                local_path = download_path
                if file.signed_url.local_path:
                    local_path = os.path.join(local_path, file.signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = os.path.join(local_path, filename)

                self._download_file(file.signed_url.url, path)
                print(f"Marking {file.id} as downloaded")
                self.mark_downloaded(file.id)

            files = self.remaining_downloads(request.id)

        return None

bulk_download_license

bulk_download_license(
    license_id: str, download_path: str = ""
)

bulk download license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def bulk_download_license(self, license_id: str, download_path: str = ""):
    """
    bulk download license
    """
    request = self.prepare_license(license_id)

    files = self.remaining_downloads(request.id)
    while len(files.results) > 0:
        for file in files.results:
            filename = self._get_filename(file.signed_url.url)
            local_path = download_path
            if file.signed_url.local_path:
                local_path = os.path.join(local_path, file.signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = os.path.join(local_path, filename)

            self._download_file(file.signed_url.url, path)
            print(f"Marking {file.id} as downloaded")
            self.mark_downloaded(file.id)

        files = self.remaining_downloads(request.id)

    return None

bulk_download_license_zip

bulk_download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None

download

download(media_id: str, download_path: str = '') -> str

downloads a specific media

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
63
64
65
66
67
68
69
70
71
72
73
74
def download(self, media_id: str, download_path: str = "") -> str:
    """
    downloads a specific media
    """
    result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
    if result:
        if result.local_path:
            download_path = os.path.join(download_path, result.local_path)
            os.makedirs(download_path, exist_ok=True)
        return self._download_url(result.url, download_path)

    return None

download_license_zip

download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None

download_media_by_license

download_media_by_license(
    license_id: str,
    asset_id: str = None,
    download_path: str = "",
) -> str

downloads all media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def download_media_by_license(
    self, license_id: str, asset_id: str = None, download_path: str = ""
) -> str:
    """
    downloads all media for a license
    """
    result = self._get(
        f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
    )
    urls = []
    if result:
        for signed_url in result.urls:
            local_path = download_path
            if signed_url.local_path:
                local_path = os.path.join(local_path, signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = self._download_url(signed_url.url, local_path)
            urls.append(path)

    return urls

download_status

download_status(download_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
198
199
200
201
202
203
204
def download_status(self, download_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/download/{download_id}/status", MultiFileDownload
    )

download_zip

download_zip(download_id, download_path: str = '')

downloads all the media for a list of assets

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
113
114
115
116
117
118
119
120
121
def download_zip(self, download_id, download_path: str = ""):
    """
    downloads all the media for a list of assets
    """
    result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
    if result:
        return self._download_url(result.url, download_path)

    return None

mark_downloaded

mark_downloaded(media_access_id: str) -> DownloadResult

mark media_access as downloaded

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
188
189
190
191
192
193
194
195
196
def mark_downloaded(self, media_access_id: str) -> DownloadResult:
    """
    mark media_access as downloaded
    """
    return self._put(
        f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
        None,
        DownloadResult,
    )

media_access

media_access(media_access_id: str) -> DownloadResult

get media access

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
180
181
182
183
184
185
186
def media_access(self, media_access_id: str) -> DownloadResult:
    """
    get media access
    """
    return self._get(
        f"/partner/asset/media/access/{media_access_id}", DownloadResult
    )

prepare_license

prepare_license(license_id: str)

prepare for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
157
158
159
160
161
def prepare_license(self, license_id: str):
    """
    prepare for download of licensed data
    """
    return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)

prepare_zip_license

prepare_zip_license(license_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 97
 98
 99
100
101
102
103
def prepare_zip_license(self, license_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
    )

remaining_downloads

remaining_downloads(download_id: str) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
172
173
174
175
176
177
178
def remaining_downloads(self, download_id: str) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}", DownloadResults
    )

remaining_n_downloads

remaining_n_downloads(
    download_id: str, top: int
) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
163
164
165
166
167
168
169
170
def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}/top/{top}",
        DownloadResults,
    )

status_zip

status_zip(download_id: str)

get status of download

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
105
106
107
108
109
110
111
def status_zip(self, download_id: str):
    """
    get status of download
    """
    return self._get(
        f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
    )

VermillioBaseClient

Base Vermillio Client providing authentication mechanism via VermillioConfig

Source code in packages/core/src/vermillio/sdk/core/clients/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class VermillioBaseClient:
    """
    Base Vermillio Client providing authentication mechanism via VermillioConfig
    """

    def __init__(
        self,
        config: Optional[VermillioConfig] = None,
        base_url: Union[str, None] = None,
    ):
        config = config or VermillioConfig.default()
        self._log = logging.getLogger(__name__)
        self._config = config
        self._session = Session()
        self._base_url = base_url or config.trace_base_url
        self._session.mount(self._base_url, OAuthTokenAdapter(config))
        self._async_client = AsyncOAuth2Client(
            config.client_id,
            config.client_secret,
            token_endpoint_auth_method="client_secret_post",
        )

    def _encode(self, value: str) -> str:
        return url_encode(value, safe="/", encoding=None, errors=None)

    def _url(self, path: str):
        return _url(self._base_url, path)

    def _json(self, data: Union[Dict, BaseModel]):
        return data if isinstance(data, Dict) else data.model_dump()

    def _or_error(
        self, result: Optional[BaseModelType], error: Exception
    ) -> BaseModelType:
        if result is None:
            raise error
        return result

    def _get(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "GET", response_type, timeout, params)

    def _put(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "PUT", response_type, timeout, params, data, files)

    def _post(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "POST", response_type, timeout, params, data, files)

    async def _async_post(
        self,
        path: str,
        data: Any,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        return await self._async_request(
            path, "POST", response_type, timeout, params, data, files
        )

    def _delete(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "DELETE", response_type, timeout, params)

    async def _async_request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Any] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", method, url)

        if not self._async_client.token:
            self._async_client.token = await self._async_client.fetch_token(
                _url(self._config.auth_base_url, "/oauth2/token"),
                grant_type="client_credentials",
                audience=self._config.audience,
            )
        res = await self._async_client.request(
            method,
            url,
            timeout=timeout,
            params=params,
            data=body,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            res.status_code,
            res.request.method,
            res.request.url,
        )
        if res.status_code == 404:
            return None
        if not 200 <= res.status_code < 300:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        try:
            return response_type.model_validate_json(res.content)
        except Exception:
            try:
                return res.json()
            except Exception:
                return res.text

    def _request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Union[Dict, BaseModel]] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[BaseModelType]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", (method, url))
        res = self._session.request(
            method,
            url,
            timeout=timeout,
            params=params,
            json=self._json(body) if body else None,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            (res.status_code, res.request.method, res.request.url),
        )
        if res.status_code == 404:
            return None
        if not res.ok:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        return response_type.model_validate_json(res.content)

    def _download_file(self, url: str, local_filename: str) -> bool:
        """Downloads using requests with a tqdm progress bar."""
        try:
            with requests.get(url, stream=True) as r:
                r.raise_for_status()
                total_size_in_bytes = int(r.headers.get("content-length", 0))
                block_size = 1024  # 1 Kibibyte

                with tqdm(
                    total=total_size_in_bytes,
                    unit="iB",
                    unit_scale=True,
                    desc=local_filename,
                ) as progress_bar:
                    with open(local_filename, "wb") as file:
                        for data in r.iter_content(block_size):
                            progress_bar.update(len(data))
                            file.write(data)

            if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
                return False

            return True

        except Exception:
            return False

VermillioBasePipelineClient

Bases: VermillioBaseClient

Base Client for interacting with External Pipeline API.

Source code in packages/core/src/vermillio/sdk/core/clients/pipelines.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class VermillioBasePipelineClient(VermillioBaseClient):
    """
    Base Client for interacting with External Pipeline API.
    """

    def __init__(self, name: str, config: Optional[VermillioConfig] = None):
        config = config or VermillioConfig.default()
        self._name = self._encode(name)
        self._assets = VermillioAssets(config)
        super().__init__(config, config.external_pipeline_url)

    def _url(self, path: str) -> str:
        return super()._url(f"/external/pipeline/{self._name}{path}")

    def _pipeline(self, pipeline_cls: type[BaseModelType]) -> Optional[BaseModelType]:
        return self._get("", pipeline_cls)

    def _upload_and_load(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load(PipelineLoadRequest(context=context, sources=sources), source_cls)

    def _load(
        self, load_request: PipelineLoadRequest, source_cls: type[BaseModelType]
    ) -> Optional[list[BaseModelType]]:
        res = self._post("", load_request, RootModel[list[source_cls]])
        return res.root if res else None

    def _load_results(
        self,
        load_request: PipelineLoadRequest,
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        sources = self._load(load_request, source_cls)
        if not sources:
            return None
        return [
            self._results(cast(PipelineSource, s).id, results_cls, load_request.context, wait)
            for s in sources
        ]

    def _upload_and_load_results(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load_results(
            PipelineLoadRequest(context=context, sources=sources),
            source_cls,
            results_cls,
            wait,
        )

    def _run(
        self,
        run_request: PipelineRunRequest,
        source_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run",
            run_request,
            source_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    # TODO: run indefinitely local polling?

    def _run_results(
        self,
        run_request: PipelineRunRequest,
        results_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run/results",
            run_request,
            results_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    def _status(
        self, status_cls: type[BaseModelType], context: Optional[str] = None
    ) -> Optional[BaseModelType]:
        return self._get("/status", status_cls, {"context": context})

    def _sources(
        self,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
        after_id: Optional[str] = None,
        limit: int = 10,
    ) -> Optional[list[BaseModelType]]:
        # TODO: validate params
        res = self._get(
            "/sources",
            RootModel[list[source_cls]],
            {"context": context, "after_id": after_id, "limit": limit},
        )
        return res.root if res else None

    def _source(
        self,
        id_or_source_id: str,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[BaseModelType]:
        return self._get(
            f"/source/{self._encode(id_or_source_id)}", source_cls, {"context": context}
        )

    def _results(
        self,
        id_or_source_id: str,
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = False,
        # TODO: add a timeout? poll rate?
    ) -> Optional[BaseModelType]:
        def __results():
            return self._get(
                f"/source/{self._encode(id_or_source_id)}/results",
                results_cls,
                {"context": context},
            )

        result = __results()        
        while wait and result and issubclass(type(result), PipelineResults) and not cast(PipelineResults, result).is_finished:
            time.sleep(1) # TODO: configure
            result = __results()
        return result

VermillioConfig pydantic-model

Bases: OAuthCredentials

Fields:

  • client_id (str)
  • client_secret (str)
  • audience (str)
  • env (str)
Source code in packages/core/src/vermillio/sdk/core/models/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class VermillioConfig(OAuthCredentials):
    env: str = "prod"

    @classmethod
    def default(cls):
        """
        Gets the default global config.
        If none is set, defaults to `from_env`
        Returns:
            VermillioConfig: The default VermillioConfig object.
        """
        global _default
        if _default is None:
            _default = cls.from_env()
        return _default

    @classmethod
    def set_default(cls, config: "VermillioConfig"):
        """
        Sets the default global config, used when no config is provided to a client.
        """
        cls._default = config

    @classmethod
    def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
        """
        Extracts a VermillioConfig from environment variables with the specified prefix.
        Args:
            env_prefix (str): Prefix used to construct final environment vars:
                              client_id = f"{env_prefix}CLIENT_ID"
                              client_secret = f"{env_prefix}CLIENT_SECRET"
        """
        key = f"{env_prefix}CLIENT_ID"
        client_id = os.environ.get(key)
        assert client_id, f"Missing required env var: {key}"
        key = f"{env_prefix}CLIENT_SECRET"
        client_secret = os.environ.get(key)
        assert client_secret, f"Missing required env var: {key}"
        env = os.environ.get(f"{env_prefix}ENV", "prod")
        return cls(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @classmethod
    def credentials(
        cls, client_id: str, client_secret: str, env: str = "prod"
    ) -> "VermillioConfig":
        return VermillioConfig(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @property
    def prod(self) -> bool:
        """
        Whether or not this is targeting production environment.
        Returns:
            bool: True if pointed at production, False otherwise.
        """
        return self.env == "prod"

    @property
    def auth_base_url(self) -> str:
        url = (
            "https://auth.cloud.vermill.io"
            if self.prod
            else "https://staging.auth.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_AUTH_URL", url)

    @property
    def trace_base_url(self) -> str:
        url = (
            "https://application.tce.cloud.vermill.io"
            if self.prod
            else f"https://application.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_TRACE_BASE_URL", url)

    @property
    def external_pipeline_url(self) -> str:
        url = (
            "https://external-pipeline.tce.cloud.vermill.io"
            if self.prod
            else f"https://external-pipeline.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_EXTERNAL_PIPELINE_URL", url)

prod property

prod: bool

Whether or not this is targeting production environment. Returns: bool: True if pointed at production, False otherwise.

default classmethod

default()

Gets the default global config. If none is set, defaults to from_env Returns: VermillioConfig: The default VermillioConfig object.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
10
11
12
13
14
15
16
17
18
19
20
21
@classmethod
def default(cls):
    """
    Gets the default global config.
    If none is set, defaults to `from_env`
    Returns:
        VermillioConfig: The default VermillioConfig object.
    """
    global _default
    if _default is None:
        _default = cls.from_env()
    return _default

from_env classmethod

from_env(
    env_prefix: str = "VERMILLIO_SDK_",
) -> VermillioConfig

Extracts a VermillioConfig from environment variables with the specified prefix. Args: env_prefix (str): Prefix used to construct final environment vars: client_id = f"{env_prefix}CLIENT_ID" client_secret = f"{env_prefix}CLIENT_SECRET"

Source code in packages/core/src/vermillio/sdk/core/models/config.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
    """
    Extracts a VermillioConfig from environment variables with the specified prefix.
    Args:
        env_prefix (str): Prefix used to construct final environment vars:
                          client_id = f"{env_prefix}CLIENT_ID"
                          client_secret = f"{env_prefix}CLIENT_SECRET"
    """
    key = f"{env_prefix}CLIENT_ID"
    client_id = os.environ.get(key)
    assert client_id, f"Missing required env var: {key}"
    key = f"{env_prefix}CLIENT_SECRET"
    client_secret = os.environ.get(key)
    assert client_secret, f"Missing required env var: {key}"
    env = os.environ.get(f"{env_prefix}ENV", "prod")
    return cls(
        env=env,
        client_id=client_id,
        client_secret=client_secret,
    )

set_default classmethod

set_default(config: VermillioConfig)

Sets the default global config, used when no config is provided to a client.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
23
24
25
26
27
28
@classmethod
def set_default(cls, config: "VermillioConfig"):
    """
    Sets the default global config, used when no config is provided to a client.
    """
    cls._default = config

clients

VermillioAssets

Bases: VermillioBaseClient

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class VermillioAssets(VermillioBaseClient):
    def list(self) -> List[PartnerAsset]:
        res = self._get("/partner/asset/list", RootModel[List[PartnerAsset]])
        return res.root if res else []

    def get(self, id: str) -> Optional[PartnerAsset]:
        return self._get(f"/partner/asset/{id}", PartnerAsset)

    def upload(
        self,
        file_path: str,
        asset_type: AssetType,
        title: str,
        parent_asset_id: Optional[str] = None,
    ) -> PartnerAsset:
        assert os.path.isfile(file_path), f"Failed to find file: '{file_path}'"
        with open(file_path, "rb") as f:
            return self._post(
                "/partner/asset/upload",
                None,
                PartnerAsset,
                {
                    "type": asset_type,
                    "title": title,
                    **({"parentAssetId": parent_asset_id} if parent_asset_id else {}),
                },
                {(os.path.basename(file_path)): f},
            )

    def _get_filename(self, file_name: str):
        parsed_url = urlparse(file_name)

        return Path(parsed_url.path).name

    def _download_url(self, url: str, download_path: str):
        filename = self._get_filename(url)
        path = os.path.join(download_path, filename)

        self._download_file(url, path)

        return path

    def download(self, media_id: str, download_path: str = "") -> str:
        """
        downloads a specific media
        """
        result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
        if result:
            if result.local_path:
                download_path = os.path.join(download_path, result.local_path)
                os.makedirs(download_path, exist_ok=True)
            return self._download_url(result.url, download_path)

        return None

    def download_media_by_license(
        self, license_id: str, asset_id: str = None, download_path: str = ""
    ) -> str:
        """
        downloads all media for a license
        """
        result = self._get(
            f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
        )
        urls = []
        if result:
            for signed_url in result.urls:
                local_path = download_path
                if signed_url.local_path:
                    local_path = os.path.join(local_path, signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = self._download_url(signed_url.url, local_path)
                urls.append(path)

        return urls

    def prepare_zip_license(self, license_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
        )

    def status_zip(self, download_id: str):
        """
        get status of download
        """
        return self._get(
            f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
        )

    def download_zip(self, download_id, download_path: str = ""):
        """
        downloads all the media for a list of assets
        """
        result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
        if result:
            return self._download_url(result.url, download_path)

        return None

    def download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def prepare_license(self, license_id: str):
        """
        prepare for download of licensed data
        """
        return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)

    def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}/top/{top}",
            DownloadResults,
        )

    def remaining_downloads(self, download_id: str) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}", DownloadResults
        )

    def media_access(self, media_access_id: str) -> DownloadResult:
        """
        get media access
        """
        return self._get(
            f"/partner/asset/media/access/{media_access_id}", DownloadResult
        )

    def mark_downloaded(self, media_access_id: str) -> DownloadResult:
        """
        mark media_access as downloaded
        """
        return self._put(
            f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
            None,
            DownloadResult,
        )

    def download_status(self, download_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/download/{download_id}/status", MultiFileDownload
        )

    def bulk_download_license(self, license_id: str, download_path: str = ""):
        """
        bulk download license
        """
        request = self.prepare_license(license_id)

        files = self.remaining_downloads(request.id)
        while len(files.results) > 0:
            for file in files.results:
                filename = self._get_filename(file.signed_url.url)
                local_path = download_path
                if file.signed_url.local_path:
                    local_path = os.path.join(local_path, file.signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = os.path.join(local_path, filename)

                self._download_file(file.signed_url.url, path)
                print(f"Marking {file.id} as downloaded")
                self.mark_downloaded(file.id)

            files = self.remaining_downloads(request.id)

        return None
bulk_download_license
bulk_download_license(
    license_id: str, download_path: str = ""
)

bulk download license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def bulk_download_license(self, license_id: str, download_path: str = ""):
    """
    bulk download license
    """
    request = self.prepare_license(license_id)

    files = self.remaining_downloads(request.id)
    while len(files.results) > 0:
        for file in files.results:
            filename = self._get_filename(file.signed_url.url)
            local_path = download_path
            if file.signed_url.local_path:
                local_path = os.path.join(local_path, file.signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = os.path.join(local_path, filename)

            self._download_file(file.signed_url.url, path)
            print(f"Marking {file.id} as downloaded")
            self.mark_downloaded(file.id)

        files = self.remaining_downloads(request.id)

    return None
bulk_download_license_zip
bulk_download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None
download
download(media_id: str, download_path: str = '') -> str

downloads a specific media

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
63
64
65
66
67
68
69
70
71
72
73
74
def download(self, media_id: str, download_path: str = "") -> str:
    """
    downloads a specific media
    """
    result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
    if result:
        if result.local_path:
            download_path = os.path.join(download_path, result.local_path)
            os.makedirs(download_path, exist_ok=True)
        return self._download_url(result.url, download_path)

    return None
download_license_zip
download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None
download_media_by_license
download_media_by_license(
    license_id: str,
    asset_id: str = None,
    download_path: str = "",
) -> str

downloads all media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def download_media_by_license(
    self, license_id: str, asset_id: str = None, download_path: str = ""
) -> str:
    """
    downloads all media for a license
    """
    result = self._get(
        f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
    )
    urls = []
    if result:
        for signed_url in result.urls:
            local_path = download_path
            if signed_url.local_path:
                local_path = os.path.join(local_path, signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = self._download_url(signed_url.url, local_path)
            urls.append(path)

    return urls
download_status
download_status(download_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
198
199
200
201
202
203
204
def download_status(self, download_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/download/{download_id}/status", MultiFileDownload
    )
download_zip
download_zip(download_id, download_path: str = '')

downloads all the media for a list of assets

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
113
114
115
116
117
118
119
120
121
def download_zip(self, download_id, download_path: str = ""):
    """
    downloads all the media for a list of assets
    """
    result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
    if result:
        return self._download_url(result.url, download_path)

    return None
mark_downloaded
mark_downloaded(media_access_id: str) -> DownloadResult

mark media_access as downloaded

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
188
189
190
191
192
193
194
195
196
def mark_downloaded(self, media_access_id: str) -> DownloadResult:
    """
    mark media_access as downloaded
    """
    return self._put(
        f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
        None,
        DownloadResult,
    )
media_access
media_access(media_access_id: str) -> DownloadResult

get media access

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
180
181
182
183
184
185
186
def media_access(self, media_access_id: str) -> DownloadResult:
    """
    get media access
    """
    return self._get(
        f"/partner/asset/media/access/{media_access_id}", DownloadResult
    )
prepare_license
prepare_license(license_id: str)

prepare for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
157
158
159
160
161
def prepare_license(self, license_id: str):
    """
    prepare for download of licensed data
    """
    return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)
prepare_zip_license
prepare_zip_license(license_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 97
 98
 99
100
101
102
103
def prepare_zip_license(self, license_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
    )
remaining_downloads
remaining_downloads(download_id: str) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
172
173
174
175
176
177
178
def remaining_downloads(self, download_id: str) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}", DownloadResults
    )
remaining_n_downloads
remaining_n_downloads(
    download_id: str, top: int
) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
163
164
165
166
167
168
169
170
def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}/top/{top}",
        DownloadResults,
    )
status_zip
status_zip(download_id: str)

get status of download

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
105
106
107
108
109
110
111
def status_zip(self, download_id: str):
    """
    get status of download
    """
    return self._get(
        f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
    )

VermillioBaseClient

Base Vermillio Client providing authentication mechanism via VermillioConfig

Source code in packages/core/src/vermillio/sdk/core/clients/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class VermillioBaseClient:
    """
    Base Vermillio Client providing authentication mechanism via VermillioConfig
    """

    def __init__(
        self,
        config: Optional[VermillioConfig] = None,
        base_url: Union[str, None] = None,
    ):
        config = config or VermillioConfig.default()
        self._log = logging.getLogger(__name__)
        self._config = config
        self._session = Session()
        self._base_url = base_url or config.trace_base_url
        self._session.mount(self._base_url, OAuthTokenAdapter(config))
        self._async_client = AsyncOAuth2Client(
            config.client_id,
            config.client_secret,
            token_endpoint_auth_method="client_secret_post",
        )

    def _encode(self, value: str) -> str:
        return url_encode(value, safe="/", encoding=None, errors=None)

    def _url(self, path: str):
        return _url(self._base_url, path)

    def _json(self, data: Union[Dict, BaseModel]):
        return data if isinstance(data, Dict) else data.model_dump()

    def _or_error(
        self, result: Optional[BaseModelType], error: Exception
    ) -> BaseModelType:
        if result is None:
            raise error
        return result

    def _get(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "GET", response_type, timeout, params)

    def _put(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "PUT", response_type, timeout, params, data, files)

    def _post(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "POST", response_type, timeout, params, data, files)

    async def _async_post(
        self,
        path: str,
        data: Any,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        return await self._async_request(
            path, "POST", response_type, timeout, params, data, files
        )

    def _delete(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "DELETE", response_type, timeout, params)

    async def _async_request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Any] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", method, url)

        if not self._async_client.token:
            self._async_client.token = await self._async_client.fetch_token(
                _url(self._config.auth_base_url, "/oauth2/token"),
                grant_type="client_credentials",
                audience=self._config.audience,
            )
        res = await self._async_client.request(
            method,
            url,
            timeout=timeout,
            params=params,
            data=body,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            res.status_code,
            res.request.method,
            res.request.url,
        )
        if res.status_code == 404:
            return None
        if not 200 <= res.status_code < 300:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        try:
            return response_type.model_validate_json(res.content)
        except Exception:
            try:
                return res.json()
            except Exception:
                return res.text

    def _request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Union[Dict, BaseModel]] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[BaseModelType]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", (method, url))
        res = self._session.request(
            method,
            url,
            timeout=timeout,
            params=params,
            json=self._json(body) if body else None,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            (res.status_code, res.request.method, res.request.url),
        )
        if res.status_code == 404:
            return None
        if not res.ok:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        return response_type.model_validate_json(res.content)

    def _download_file(self, url: str, local_filename: str) -> bool:
        """Downloads using requests with a tqdm progress bar."""
        try:
            with requests.get(url, stream=True) as r:
                r.raise_for_status()
                total_size_in_bytes = int(r.headers.get("content-length", 0))
                block_size = 1024  # 1 Kibibyte

                with tqdm(
                    total=total_size_in_bytes,
                    unit="iB",
                    unit_scale=True,
                    desc=local_filename,
                ) as progress_bar:
                    with open(local_filename, "wb") as file:
                        for data in r.iter_content(block_size):
                            progress_bar.update(len(data))
                            file.write(data)

            if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
                return False

            return True

        except Exception:
            return False

VermillioBasePipelineClient

Bases: VermillioBaseClient

Base Client for interacting with External Pipeline API.

Source code in packages/core/src/vermillio/sdk/core/clients/pipelines.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class VermillioBasePipelineClient(VermillioBaseClient):
    """
    Base Client for interacting with External Pipeline API.
    """

    def __init__(self, name: str, config: Optional[VermillioConfig] = None):
        config = config or VermillioConfig.default()
        self._name = self._encode(name)
        self._assets = VermillioAssets(config)
        super().__init__(config, config.external_pipeline_url)

    def _url(self, path: str) -> str:
        return super()._url(f"/external/pipeline/{self._name}{path}")

    def _pipeline(self, pipeline_cls: type[BaseModelType]) -> Optional[BaseModelType]:
        return self._get("", pipeline_cls)

    def _upload_and_load(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load(PipelineLoadRequest(context=context, sources=sources), source_cls)

    def _load(
        self, load_request: PipelineLoadRequest, source_cls: type[BaseModelType]
    ) -> Optional[list[BaseModelType]]:
        res = self._post("", load_request, RootModel[list[source_cls]])
        return res.root if res else None

    def _load_results(
        self,
        load_request: PipelineLoadRequest,
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        sources = self._load(load_request, source_cls)
        if not sources:
            return None
        return [
            self._results(cast(PipelineSource, s).id, results_cls, load_request.context, wait)
            for s in sources
        ]

    def _upload_and_load_results(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load_results(
            PipelineLoadRequest(context=context, sources=sources),
            source_cls,
            results_cls,
            wait,
        )

    def _run(
        self,
        run_request: PipelineRunRequest,
        source_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run",
            run_request,
            source_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    # TODO: run indefinitely local polling?

    def _run_results(
        self,
        run_request: PipelineRunRequest,
        results_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run/results",
            run_request,
            results_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    def _status(
        self, status_cls: type[BaseModelType], context: Optional[str] = None
    ) -> Optional[BaseModelType]:
        return self._get("/status", status_cls, {"context": context})

    def _sources(
        self,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
        after_id: Optional[str] = None,
        limit: int = 10,
    ) -> Optional[list[BaseModelType]]:
        # TODO: validate params
        res = self._get(
            "/sources",
            RootModel[list[source_cls]],
            {"context": context, "after_id": after_id, "limit": limit},
        )
        return res.root if res else None

    def _source(
        self,
        id_or_source_id: str,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[BaseModelType]:
        return self._get(
            f"/source/{self._encode(id_or_source_id)}", source_cls, {"context": context}
        )

    def _results(
        self,
        id_or_source_id: str,
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = False,
        # TODO: add a timeout? poll rate?
    ) -> Optional[BaseModelType]:
        def __results():
            return self._get(
                f"/source/{self._encode(id_or_source_id)}/results",
                results_cls,
                {"context": context},
            )

        result = __results()        
        while wait and result and issubclass(type(result), PipelineResults) and not cast(PipelineResults, result).is_finished:
            time.sleep(1) # TODO: configure
            result = __results()
        return result

assets

VermillioAssets

Bases: VermillioBaseClient

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class VermillioAssets(VermillioBaseClient):
    def list(self) -> List[PartnerAsset]:
        res = self._get("/partner/asset/list", RootModel[List[PartnerAsset]])
        return res.root if res else []

    def get(self, id: str) -> Optional[PartnerAsset]:
        return self._get(f"/partner/asset/{id}", PartnerAsset)

    def upload(
        self,
        file_path: str,
        asset_type: AssetType,
        title: str,
        parent_asset_id: Optional[str] = None,
    ) -> PartnerAsset:
        assert os.path.isfile(file_path), f"Failed to find file: '{file_path}'"
        with open(file_path, "rb") as f:
            return self._post(
                "/partner/asset/upload",
                None,
                PartnerAsset,
                {
                    "type": asset_type,
                    "title": title,
                    **({"parentAssetId": parent_asset_id} if parent_asset_id else {}),
                },
                {(os.path.basename(file_path)): f},
            )

    def _get_filename(self, file_name: str):
        parsed_url = urlparse(file_name)

        return Path(parsed_url.path).name

    def _download_url(self, url: str, download_path: str):
        filename = self._get_filename(url)
        path = os.path.join(download_path, filename)

        self._download_file(url, path)

        return path

    def download(self, media_id: str, download_path: str = "") -> str:
        """
        downloads a specific media
        """
        result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
        if result:
            if result.local_path:
                download_path = os.path.join(download_path, result.local_path)
                os.makedirs(download_path, exist_ok=True)
            return self._download_url(result.url, download_path)

        return None

    def download_media_by_license(
        self, license_id: str, asset_id: str = None, download_path: str = ""
    ) -> str:
        """
        downloads all media for a license
        """
        result = self._get(
            f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
        )
        urls = []
        if result:
            for signed_url in result.urls:
                local_path = download_path
                if signed_url.local_path:
                    local_path = os.path.join(local_path, signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = self._download_url(signed_url.url, local_path)
                urls.append(path)

        return urls

    def prepare_zip_license(self, license_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
        )

    def status_zip(self, download_id: str):
        """
        get status of download
        """
        return self._get(
            f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
        )

    def download_zip(self, download_id, download_path: str = ""):
        """
        downloads all the media for a list of assets
        """
        result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
        if result:
            return self._download_url(result.url, download_path)

        return None

    def download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
        """
        downloads all the media for a license
        """
        result = self._get(
            f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
        )
        if result:
            filename = self._get_filename(result.url)
            path = os.path.join(download_path, filename)

            self._download_file(result.url, path)

            return path

        return None

    def prepare_license(self, license_id: str):
        """
        prepare for download of licensed data
        """
        return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)

    def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}/top/{top}",
            DownloadResults,
        )

    def remaining_downloads(self, download_id: str) -> DownloadResults:
        """
        get remaining bulk downloads
        """
        return self._get(
            f"/partner/asset/download/remaining/{download_id}", DownloadResults
        )

    def media_access(self, media_access_id: str) -> DownloadResult:
        """
        get media access
        """
        return self._get(
            f"/partner/asset/media/access/{media_access_id}", DownloadResult
        )

    def mark_downloaded(self, media_access_id: str) -> DownloadResult:
        """
        mark media_access as downloaded
        """
        return self._put(
            f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
            None,
            DownloadResult,
        )

    def download_status(self, download_id: str):
        """
        prepare download zip for for download of licensed data
        """
        return self._get(
            f"/partner/asset/download/{download_id}/status", MultiFileDownload
        )

    def bulk_download_license(self, license_id: str, download_path: str = ""):
        """
        bulk download license
        """
        request = self.prepare_license(license_id)

        files = self.remaining_downloads(request.id)
        while len(files.results) > 0:
            for file in files.results:
                filename = self._get_filename(file.signed_url.url)
                local_path = download_path
                if file.signed_url.local_path:
                    local_path = os.path.join(local_path, file.signed_url.local_path)
                    os.makedirs(local_path, exist_ok=True)
                path = os.path.join(local_path, filename)

                self._download_file(file.signed_url.url, path)
                print(f"Marking {file.id} as downloaded")
                self.mark_downloaded(file.id)

            files = self.remaining_downloads(request.id)

        return None
bulk_download_license
bulk_download_license(
    license_id: str, download_path: str = ""
)

bulk download license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def bulk_download_license(self, license_id: str, download_path: str = ""):
    """
    bulk download license
    """
    request = self.prepare_license(license_id)

    files = self.remaining_downloads(request.id)
    while len(files.results) > 0:
        for file in files.results:
            filename = self._get_filename(file.signed_url.url)
            local_path = download_path
            if file.signed_url.local_path:
                local_path = os.path.join(local_path, file.signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = os.path.join(local_path, filename)

            self._download_file(file.signed_url.url, path)
            print(f"Marking {file.id} as downloaded")
            self.mark_downloaded(file.id)

        files = self.remaining_downloads(request.id)

    return None
bulk_download_license_zip
bulk_download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def bulk_download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None
download
download(media_id: str, download_path: str = '') -> str

downloads a specific media

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
63
64
65
66
67
68
69
70
71
72
73
74
def download(self, media_id: str, download_path: str = "") -> str:
    """
    downloads a specific media
    """
    result = self._get(f"/partner/asset/download/media/{media_id}", SignedUrl)
    if result:
        if result.local_path:
            download_path = os.path.join(download_path, result.local_path)
            os.makedirs(download_path, exist_ok=True)
        return self._download_url(result.url, download_path)

    return None
download_license_zip
download_license_zip(
    license_id: str, download_path: str = ""
)

downloads all the media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def download_license_zip(self, license_id: str, download_path: str = ""):
    """
    downloads all the media for a license
    """
    result = self._get(
        f"/partner/asset/download/bulk/license/{license_id}/zip", SignedUrl
    )
    if result:
        filename = self._get_filename(result.url)
        path = os.path.join(download_path, filename)

        self._download_file(result.url, path)

        return path

    return None
download_media_by_license
download_media_by_license(
    license_id: str,
    asset_id: str = None,
    download_path: str = "",
) -> str

downloads all media for a license

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def download_media_by_license(
    self, license_id: str, asset_id: str = None, download_path: str = ""
) -> str:
    """
    downloads all media for a license
    """
    result = self._get(
        f"/partner/asset/download/license/{license_id}/asset/{asset_id}", SignedUrls
    )
    urls = []
    if result:
        for signed_url in result.urls:
            local_path = download_path
            if signed_url.local_path:
                local_path = os.path.join(local_path, signed_url.local_path)
                os.makedirs(local_path, exist_ok=True)
            path = self._download_url(signed_url.url, local_path)
            urls.append(path)

    return urls
download_status
download_status(download_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
198
199
200
201
202
203
204
def download_status(self, download_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/download/{download_id}/status", MultiFileDownload
    )
download_zip
download_zip(download_id, download_path: str = '')

downloads all the media for a list of assets

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
113
114
115
116
117
118
119
120
121
def download_zip(self, download_id, download_path: str = ""):
    """
    downloads all the media for a list of assets
    """
    result = self._get(f"/partner/asset/download/bulk/{download_id}/zip", SignedUrl)
    if result:
        return self._download_url(result.url, download_path)

    return None
mark_downloaded
mark_downloaded(media_access_id: str) -> DownloadResult

mark media_access as downloaded

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
188
189
190
191
192
193
194
195
196
def mark_downloaded(self, media_access_id: str) -> DownloadResult:
    """
    mark media_access as downloaded
    """
    return self._put(
        f"/partner/asset/mark/media/access/downloaded/{media_access_id}",
        None,
        DownloadResult,
    )
media_access
media_access(media_access_id: str) -> DownloadResult

get media access

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
180
181
182
183
184
185
186
def media_access(self, media_access_id: str) -> DownloadResult:
    """
    get media access
    """
    return self._get(
        f"/partner/asset/media/access/{media_access_id}", DownloadResult
    )
prepare_license
prepare_license(license_id: str)

prepare for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
157
158
159
160
161
def prepare_license(self, license_id: str):
    """
    prepare for download of licensed data
    """
    return self._get(f"/partner/asset/license/{license_id}/prepare", BulkRequest)
prepare_zip_license
prepare_zip_license(license_id: str)

prepare download zip for for download of licensed data

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
 97
 98
 99
100
101
102
103
def prepare_zip_license(self, license_id: str):
    """
    prepare download zip for for download of licensed data
    """
    return self._get(
        f"/partner/asset/license/{license_id}/prepare/zip", MultiFileDownload
    )
remaining_downloads
remaining_downloads(download_id: str) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
172
173
174
175
176
177
178
def remaining_downloads(self, download_id: str) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}", DownloadResults
    )
remaining_n_downloads
remaining_n_downloads(
    download_id: str, top: int
) -> DownloadResults

get remaining bulk downloads

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
163
164
165
166
167
168
169
170
def remaining_n_downloads(self, download_id: str, top: int) -> DownloadResults:
    """
    get remaining bulk downloads
    """
    return self._get(
        f"/partner/asset/download/remaining/{download_id}/top/{top}",
        DownloadResults,
    )
status_zip
status_zip(download_id: str)

get status of download

Source code in packages/core/src/vermillio/sdk/core/clients/assets.py
105
106
107
108
109
110
111
def status_zip(self, download_id: str):
    """
    get status of download
    """
    return self._get(
        f"/partner/asset/download/status/{download_id}//zip", MultiFileDownload
    )

base

VermillioBaseClient

Base Vermillio Client providing authentication mechanism via VermillioConfig

Source code in packages/core/src/vermillio/sdk/core/clients/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class VermillioBaseClient:
    """
    Base Vermillio Client providing authentication mechanism via VermillioConfig
    """

    def __init__(
        self,
        config: Optional[VermillioConfig] = None,
        base_url: Union[str, None] = None,
    ):
        config = config or VermillioConfig.default()
        self._log = logging.getLogger(__name__)
        self._config = config
        self._session = Session()
        self._base_url = base_url or config.trace_base_url
        self._session.mount(self._base_url, OAuthTokenAdapter(config))
        self._async_client = AsyncOAuth2Client(
            config.client_id,
            config.client_secret,
            token_endpoint_auth_method="client_secret_post",
        )

    def _encode(self, value: str) -> str:
        return url_encode(value, safe="/", encoding=None, errors=None)

    def _url(self, path: str):
        return _url(self._base_url, path)

    def _json(self, data: Union[Dict, BaseModel]):
        return data if isinstance(data, Dict) else data.model_dump()

    def _or_error(
        self, result: Optional[BaseModelType], error: Exception
    ) -> BaseModelType:
        if result is None:
            raise error
        return result

    def _get(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "GET", response_type, timeout, params)

    def _put(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "PUT", response_type, timeout, params, data, files)

    def _post(
        self,
        path: str,
        data: Union[Dict, BaseModel],
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "POST", response_type, timeout, params, data, files)

    async def _async_post(
        self,
        path: str,
        data: Any,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        files: Optional[Dict[str, IO]] = None,
        timeout: Optional[float] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        return await self._async_request(
            path, "POST", response_type, timeout, params, data, files
        )

    def _delete(
        self,
        path: str,
        response_type: Type[BaseModelType],
        params: Optional[Dict] = None,
        timeout: Optional[float] = None,
    ) -> Optional[BaseModelType]:
        return self._request(path, "DELETE", response_type, timeout, params)

    async def _async_request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Any] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[Union[str, Dict, BaseModelType]]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", method, url)

        if not self._async_client.token:
            self._async_client.token = await self._async_client.fetch_token(
                _url(self._config.auth_base_url, "/oauth2/token"),
                grant_type="client_credentials",
                audience=self._config.audience,
            )
        res = await self._async_client.request(
            method,
            url,
            timeout=timeout,
            params=params,
            data=body,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            res.status_code,
            res.request.method,
            res.request.url,
        )
        if res.status_code == 404:
            return None
        if not 200 <= res.status_code < 300:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        try:
            return response_type.model_validate_json(res.content)
        except Exception:
            try:
                return res.json()
            except Exception:
                return res.text

    def _request(
        self,
        path: str,
        method: HttpMethod,
        response_type: Type[BaseModelType],
        timeout: Optional[float] = None,
        params: Optional[Dict] = None,
        body: Optional[Union[Dict, BaseModel]] = None,
        files: Optional[Dict[str, IO]] = None,
    ) -> Optional[BaseModelType]:
        url = self._url(path)
        self._log.debug("Sending %s request to url %s", (method, url))
        res = self._session.request(
            method,
            url,
            timeout=timeout,
            params=params,
            json=self._json(body) if body else None,
            files=files,
        )
        self._log.debug(
            "Received %d for %s to url %s",
            (res.status_code, res.request.method, res.request.url),
        )
        if res.status_code == 404:
            return None
        if not res.ok:
            self._log.debug("Response: %s", res.content)
            res.raise_for_status()
        return response_type.model_validate_json(res.content)

    def _download_file(self, url: str, local_filename: str) -> bool:
        """Downloads using requests with a tqdm progress bar."""
        try:
            with requests.get(url, stream=True) as r:
                r.raise_for_status()
                total_size_in_bytes = int(r.headers.get("content-length", 0))
                block_size = 1024  # 1 Kibibyte

                with tqdm(
                    total=total_size_in_bytes,
                    unit="iB",
                    unit_scale=True,
                    desc=local_filename,
                ) as progress_bar:
                    with open(local_filename, "wb") as file:
                        for data in r.iter_content(block_size):
                            progress_bar.update(len(data))
                            file.write(data)

            if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
                return False

            return True

        except Exception:
            return False

pipelines

VermillioBasePipelineClient

Bases: VermillioBaseClient

Base Client for interacting with External Pipeline API.

Source code in packages/core/src/vermillio/sdk/core/clients/pipelines.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class VermillioBasePipelineClient(VermillioBaseClient):
    """
    Base Client for interacting with External Pipeline API.
    """

    def __init__(self, name: str, config: Optional[VermillioConfig] = None):
        config = config or VermillioConfig.default()
        self._name = self._encode(name)
        self._assets = VermillioAssets(config)
        super().__init__(config, config.external_pipeline_url)

    def _url(self, path: str) -> str:
        return super()._url(f"/external/pipeline/{self._name}{path}")

    def _pipeline(self, pipeline_cls: type[BaseModelType]) -> Optional[BaseModelType]:
        return self._get("", pipeline_cls)

    def _upload_and_load(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load(PipelineLoadRequest(context=context, sources=sources), source_cls)

    def _load(
        self, load_request: PipelineLoadRequest, source_cls: type[BaseModelType]
    ) -> Optional[list[BaseModelType]]:
        res = self._post("", load_request, RootModel[list[source_cls]])
        return res.root if res else None

    def _load_results(
        self,
        load_request: PipelineLoadRequest,
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        sources = self._load(load_request, source_cls)
        if not sources:
            return None
        return [
            self._results(cast(PipelineSource, s).id, results_cls, load_request.context, wait)
            for s in sources
        ]

    def _upload_and_load_results(
        self,
        file_paths: list[str],
        asset_type: AssetType,
        titles: list[str],
        source_cls: type[BaseModelType],
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = True,
    ) -> Optional[list[BaseModelType]]:
        if len(file_paths) != len(titles):
            raise ValueError(f"file_paths and titles must have equal length ({len(file_paths)} != {len(titles)}).")
        media_ids = []
        for file_path, title in zip(file_paths, titles):
            asset = self._assets.upload(file_path, asset_type, title)
            media_ids.extend(asset.media_ids)
        sources = [ExternalSource(media_id=media_id) for media_id in media_ids]
        return self._load_results(
            PipelineLoadRequest(context=context, sources=sources),
            source_cls,
            results_cls,
            wait,
        )

    def _run(
        self,
        run_request: PipelineRunRequest,
        source_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run",
            run_request,
            source_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    # TODO: run indefinitely local polling?

    def _run_results(
        self,
        run_request: PipelineRunRequest,
        results_cls: type[BaseModelType],
        timeout: float = 60.0,
        poll_rate: float = 1.0,
    ) -> Optional[BaseModelType]:
        # TODO: bounds on params/validate?
        return self._post(
            "/run/results",
            run_request,
            results_cls,
            {"timeout": timeout, "poll_rate": poll_rate},
        )

    def _status(
        self, status_cls: type[BaseModelType], context: Optional[str] = None
    ) -> Optional[BaseModelType]:
        return self._get("/status", status_cls, {"context": context})

    def _sources(
        self,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
        after_id: Optional[str] = None,
        limit: int = 10,
    ) -> Optional[list[BaseModelType]]:
        # TODO: validate params
        res = self._get(
            "/sources",
            RootModel[list[source_cls]],
            {"context": context, "after_id": after_id, "limit": limit},
        )
        return res.root if res else None

    def _source(
        self,
        id_or_source_id: str,
        source_cls: type[BaseModelType],
        context: Optional[str] = None,
    ) -> Optional[BaseModelType]:
        return self._get(
            f"/source/{self._encode(id_or_source_id)}", source_cls, {"context": context}
        )

    def _results(
        self,
        id_or_source_id: str,
        results_cls: type[BaseModelType],
        context: Optional[str] = None,
        wait: bool = False,
        # TODO: add a timeout? poll rate?
    ) -> Optional[BaseModelType]:
        def __results():
            return self._get(
                f"/source/{self._encode(id_or_source_id)}/results",
                results_cls,
                {"context": context},
            )

        result = __results()        
        while wait and result and issubclass(type(result), PipelineResults) and not cast(PipelineResults, result).is_finished:
            time.sleep(1) # TODO: configure
            result = __results()
        return result

models

ExternalPipeline pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ExternalPipeline(BaseModel):
    id: str = Field(description="Unique id of the pipeline.")
    name: str = Field(description="The friendly name of the pipeline.")
    description: Optional[str] = Field(
        default=None, description="Description of what the purpose of this pipeline is."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this pipeline was created."
    )
    contexts: list[str] = Field(
        description="The list of contexts available in this pipeline."
    )
    default_context: str = Field(
        description="The default context used if none is provided in endpoints."
    )
contexts pydantic-field
contexts: list[str]

The list of contexts available in this pipeline.

created_at pydantic-field
created_at: float

The epoch time in seconds of when this pipeline was created.

default_context pydantic-field
default_context: str

The default context used if none is provided in endpoints.

description pydantic-field
description: Optional[str] = None

Description of what the purpose of this pipeline is.

id pydantic-field
id: str

Unique id of the pipeline.

name pydantic-field
name: str

The friendly name of the pipeline.

ExternalSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
22
23
24
25
26
27
28
29
30
31
32
33
34
class ExternalSource(BaseModel):
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )
asset_id pydantic-field
asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

media_id pydantic-field
media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

PipelineLoadRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
47
48
49
50
51
52
53
54
class PipelineLoadRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to load the sources against.",
    )
    sources: Union[list[ExternalSource], list[str]] = Field(
        description="The sources to load, either an external source or a path to a file (if the pipeline supports that).",
    )
context pydantic-field
context: Optional[str] = None

Override the default context, if available, to load the sources against.

sources pydantic-field

The sources to load, either an external source or a path to a file (if the pipeline supports that).

PipelineResults pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PipelineResults(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    source_path: Optional[str] = Field(
        default=None,
        description="The (optional) source path that was provided in the external source."
    )

    @property
    def is_finished(self) -> bool:
        return self.status in ["Succeeded", "Failed", "Deleted"]
created_at pydantic-field
created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field
id: str

Vermillio's unique id for this pipeline source.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

source_path pydantic-field
source_path: Optional[str] = None

The (optional) source path that was provided in the external source.

status pydantic-field
status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field
updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineRunRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
37
38
39
40
41
42
43
44
class PipelineRunRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to run the source against.",
    )
    source: Union[ExternalSource, str] = Field(
        description="The source to run, either an external source or a path to a file (if the pipeline supports that).",
    )
context pydantic-field
context: Optional[str] = None

Override the default context, if available, to run the source against.

source pydantic-field

The source to run, either an external source or a path to a file (if the pipeline supports that).

PipelineSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class PipelineSource(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )
asset_id pydantic-field
asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

created_at pydantic-field
created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field
id: str

Vermillio's unique id for this pipeline source.

media_id pydantic-field
media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

status pydantic-field
status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field
updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineStatus pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PipelineStatus(BaseModel):
    pending_count: int = Field(
        description="The number of sources under this context with status == 'Pending'"
    )
    running_count: int = Field(
        description="The number of sources under this context with status == 'Running'"
    )
    succeeded_count: int = Field(
        description="The number of sources under this context with status == 'Succeeded'"
    )
    errored_count: int = Field(
        description="The number of sources under this context with status == 'Errored'"
    )
    failed_count: int = Field(
        description="The number of sources under this context with status == 'Failed'"
    )
errored_count pydantic-field
errored_count: int

The number of sources under this context with status == 'Errored'

failed_count pydantic-field
failed_count: int

The number of sources under this context with status == 'Failed'

pending_count pydantic-field
pending_count: int

The number of sources under this context with status == 'Pending'

running_count pydantic-field
running_count: int

The number of sources under this context with status == 'Running'

succeeded_count pydantic-field
succeeded_count: int

The number of sources under this context with status == 'Succeeded'

VermillioConfig pydantic-model

Bases: OAuthCredentials

Fields:

  • client_id (str)
  • client_secret (str)
  • audience (str)
  • env (str)
Source code in packages/core/src/vermillio/sdk/core/models/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class VermillioConfig(OAuthCredentials):
    env: str = "prod"

    @classmethod
    def default(cls):
        """
        Gets the default global config.
        If none is set, defaults to `from_env`
        Returns:
            VermillioConfig: The default VermillioConfig object.
        """
        global _default
        if _default is None:
            _default = cls.from_env()
        return _default

    @classmethod
    def set_default(cls, config: "VermillioConfig"):
        """
        Sets the default global config, used when no config is provided to a client.
        """
        cls._default = config

    @classmethod
    def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
        """
        Extracts a VermillioConfig from environment variables with the specified prefix.
        Args:
            env_prefix (str): Prefix used to construct final environment vars:
                              client_id = f"{env_prefix}CLIENT_ID"
                              client_secret = f"{env_prefix}CLIENT_SECRET"
        """
        key = f"{env_prefix}CLIENT_ID"
        client_id = os.environ.get(key)
        assert client_id, f"Missing required env var: {key}"
        key = f"{env_prefix}CLIENT_SECRET"
        client_secret = os.environ.get(key)
        assert client_secret, f"Missing required env var: {key}"
        env = os.environ.get(f"{env_prefix}ENV", "prod")
        return cls(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @classmethod
    def credentials(
        cls, client_id: str, client_secret: str, env: str = "prod"
    ) -> "VermillioConfig":
        return VermillioConfig(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @property
    def prod(self) -> bool:
        """
        Whether or not this is targeting production environment.
        Returns:
            bool: True if pointed at production, False otherwise.
        """
        return self.env == "prod"

    @property
    def auth_base_url(self) -> str:
        url = (
            "https://auth.cloud.vermill.io"
            if self.prod
            else "https://staging.auth.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_AUTH_URL", url)

    @property
    def trace_base_url(self) -> str:
        url = (
            "https://application.tce.cloud.vermill.io"
            if self.prod
            else f"https://application.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_TRACE_BASE_URL", url)

    @property
    def external_pipeline_url(self) -> str:
        url = (
            "https://external-pipeline.tce.cloud.vermill.io"
            if self.prod
            else f"https://external-pipeline.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_EXTERNAL_PIPELINE_URL", url)
prod property
prod: bool

Whether or not this is targeting production environment. Returns: bool: True if pointed at production, False otherwise.

default classmethod
default()

Gets the default global config. If none is set, defaults to from_env Returns: VermillioConfig: The default VermillioConfig object.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
10
11
12
13
14
15
16
17
18
19
20
21
@classmethod
def default(cls):
    """
    Gets the default global config.
    If none is set, defaults to `from_env`
    Returns:
        VermillioConfig: The default VermillioConfig object.
    """
    global _default
    if _default is None:
        _default = cls.from_env()
    return _default
from_env classmethod
from_env(
    env_prefix: str = "VERMILLIO_SDK_",
) -> VermillioConfig

Extracts a VermillioConfig from environment variables with the specified prefix. Args: env_prefix (str): Prefix used to construct final environment vars: client_id = f"{env_prefix}CLIENT_ID" client_secret = f"{env_prefix}CLIENT_SECRET"

Source code in packages/core/src/vermillio/sdk/core/models/config.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
    """
    Extracts a VermillioConfig from environment variables with the specified prefix.
    Args:
        env_prefix (str): Prefix used to construct final environment vars:
                          client_id = f"{env_prefix}CLIENT_ID"
                          client_secret = f"{env_prefix}CLIENT_SECRET"
    """
    key = f"{env_prefix}CLIENT_ID"
    client_id = os.environ.get(key)
    assert client_id, f"Missing required env var: {key}"
    key = f"{env_prefix}CLIENT_SECRET"
    client_secret = os.environ.get(key)
    assert client_secret, f"Missing required env var: {key}"
    env = os.environ.get(f"{env_prefix}ENV", "prod")
    return cls(
        env=env,
        client_id=client_id,
        client_secret=client_secret,
    )
set_default classmethod
set_default(config: VermillioConfig)

Sets the default global config, used when no config is provided to a client.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
23
24
25
26
27
28
@classmethod
def set_default(cls, config: "VermillioConfig"):
    """
    Sets the default global config, used when no config is provided to a client.
    """
    cls._default = config

config

VermillioConfig pydantic-model

Bases: OAuthCredentials

Fields:

  • client_id (str)
  • client_secret (str)
  • audience (str)
  • env (str)
Source code in packages/core/src/vermillio/sdk/core/models/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class VermillioConfig(OAuthCredentials):
    env: str = "prod"

    @classmethod
    def default(cls):
        """
        Gets the default global config.
        If none is set, defaults to `from_env`
        Returns:
            VermillioConfig: The default VermillioConfig object.
        """
        global _default
        if _default is None:
            _default = cls.from_env()
        return _default

    @classmethod
    def set_default(cls, config: "VermillioConfig"):
        """
        Sets the default global config, used when no config is provided to a client.
        """
        cls._default = config

    @classmethod
    def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
        """
        Extracts a VermillioConfig from environment variables with the specified prefix.
        Args:
            env_prefix (str): Prefix used to construct final environment vars:
                              client_id = f"{env_prefix}CLIENT_ID"
                              client_secret = f"{env_prefix}CLIENT_SECRET"
        """
        key = f"{env_prefix}CLIENT_ID"
        client_id = os.environ.get(key)
        assert client_id, f"Missing required env var: {key}"
        key = f"{env_prefix}CLIENT_SECRET"
        client_secret = os.environ.get(key)
        assert client_secret, f"Missing required env var: {key}"
        env = os.environ.get(f"{env_prefix}ENV", "prod")
        return cls(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @classmethod
    def credentials(
        cls, client_id: str, client_secret: str, env: str = "prod"
    ) -> "VermillioConfig":
        return VermillioConfig(
            env=env,
            client_id=client_id,
            client_secret=client_secret,
        )

    @property
    def prod(self) -> bool:
        """
        Whether or not this is targeting production environment.
        Returns:
            bool: True if pointed at production, False otherwise.
        """
        return self.env == "prod"

    @property
    def auth_base_url(self) -> str:
        url = (
            "https://auth.cloud.vermill.io"
            if self.prod
            else "https://staging.auth.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_AUTH_URL", url)

    @property
    def trace_base_url(self) -> str:
        url = (
            "https://application.tce.cloud.vermill.io"
            if self.prod
            else f"https://application.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_TRACE_BASE_URL", url)

    @property
    def external_pipeline_url(self) -> str:
        url = (
            "https://external-pipeline.tce.cloud.vermill.io"
            if self.prod
            else f"https://external-pipeline.tce.{self.env}.cloud.vermill.io"
        )
        return os.getenv("VERMILLIO_EXTERNAL_PIPELINE_URL", url)
prod property
prod: bool

Whether or not this is targeting production environment. Returns: bool: True if pointed at production, False otherwise.

default classmethod
default()

Gets the default global config. If none is set, defaults to from_env Returns: VermillioConfig: The default VermillioConfig object.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
10
11
12
13
14
15
16
17
18
19
20
21
@classmethod
def default(cls):
    """
    Gets the default global config.
    If none is set, defaults to `from_env`
    Returns:
        VermillioConfig: The default VermillioConfig object.
    """
    global _default
    if _default is None:
        _default = cls.from_env()
    return _default
from_env classmethod
from_env(
    env_prefix: str = "VERMILLIO_SDK_",
) -> VermillioConfig

Extracts a VermillioConfig from environment variables with the specified prefix. Args: env_prefix (str): Prefix used to construct final environment vars: client_id = f"{env_prefix}CLIENT_ID" client_secret = f"{env_prefix}CLIENT_SECRET"

Source code in packages/core/src/vermillio/sdk/core/models/config.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def from_env(cls, env_prefix: str = "VERMILLIO_SDK_") -> "VermillioConfig":
    """
    Extracts a VermillioConfig from environment variables with the specified prefix.
    Args:
        env_prefix (str): Prefix used to construct final environment vars:
                          client_id = f"{env_prefix}CLIENT_ID"
                          client_secret = f"{env_prefix}CLIENT_SECRET"
    """
    key = f"{env_prefix}CLIENT_ID"
    client_id = os.environ.get(key)
    assert client_id, f"Missing required env var: {key}"
    key = f"{env_prefix}CLIENT_SECRET"
    client_secret = os.environ.get(key)
    assert client_secret, f"Missing required env var: {key}"
    env = os.environ.get(f"{env_prefix}ENV", "prod")
    return cls(
        env=env,
        client_id=client_id,
        client_secret=client_secret,
    )
set_default classmethod
set_default(config: VermillioConfig)

Sets the default global config, used when no config is provided to a client.

Source code in packages/core/src/vermillio/sdk/core/models/config.py
23
24
25
26
27
28
@classmethod
def set_default(cls, config: "VermillioConfig"):
    """
    Sets the default global config, used when no config is provided to a client.
    """
    cls._default = config

pipelines

ExternalPipeline pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ExternalPipeline(BaseModel):
    id: str = Field(description="Unique id of the pipeline.")
    name: str = Field(description="The friendly name of the pipeline.")
    description: Optional[str] = Field(
        default=None, description="Description of what the purpose of this pipeline is."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this pipeline was created."
    )
    contexts: list[str] = Field(
        description="The list of contexts available in this pipeline."
    )
    default_context: str = Field(
        description="The default context used if none is provided in endpoints."
    )
contexts pydantic-field
contexts: list[str]

The list of contexts available in this pipeline.

created_at pydantic-field
created_at: float

The epoch time in seconds of when this pipeline was created.

default_context pydantic-field
default_context: str

The default context used if none is provided in endpoints.

description pydantic-field
description: Optional[str] = None

Description of what the purpose of this pipeline is.

id pydantic-field
id: str

Unique id of the pipeline.

name pydantic-field
name: str

The friendly name of the pipeline.

ExternalSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
22
23
24
25
26
27
28
29
30
31
32
33
34
class ExternalSource(BaseModel):
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )
asset_id pydantic-field
asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

media_id pydantic-field
media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

PipelineLoadRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
47
48
49
50
51
52
53
54
class PipelineLoadRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to load the sources against.",
    )
    sources: Union[list[ExternalSource], list[str]] = Field(
        description="The sources to load, either an external source or a path to a file (if the pipeline supports that).",
    )
context pydantic-field
context: Optional[str] = None

Override the default context, if available, to load the sources against.

sources pydantic-field

The sources to load, either an external source or a path to a file (if the pipeline supports that).

PipelineResults pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PipelineResults(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    source_path: Optional[str] = Field(
        default=None,
        description="The (optional) source path that was provided in the external source."
    )

    @property
    def is_finished(self) -> bool:
        return self.status in ["Succeeded", "Failed", "Deleted"]
created_at pydantic-field
created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field
id: str

Vermillio's unique id for this pipeline source.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

source_path pydantic-field
source_path: Optional[str] = None

The (optional) source path that was provided in the external source.

status pydantic-field
status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field
updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineRunRequest pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
37
38
39
40
41
42
43
44
class PipelineRunRequest(BaseModel):
    context: Optional[str] = Field(
        default=None,
        description="Override the default context, if available, to run the source against.",
    )
    source: Union[ExternalSource, str] = Field(
        description="The source to run, either an external source or a path to a file (if the pipeline supports that).",
    )
context pydantic-field
context: Optional[str] = None

Override the default context, if available, to run the source against.

source pydantic-field

The source to run, either an external source or a path to a file (if the pipeline supports that).

PipelineSource pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class PipelineSource(BaseModel):
    id: str = Field(description="Vermillio's unique id for this pipeline source.")
    status: PipelineSourceStatus = Field(
        description="The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion."
    )
    created_at: float = Field(
        description="The epoch time in seconds of when this source was created."
    )
    updated_at: Optional[float] = Field(
        description="The last epoch time in seconds of when this source was last updated."
    )
    source_id: Optional[str] = Field(
        default=None,
        description="A (optional) unique id for the source that represents this request in your system.",
    )
    media_id: Optional[str] = Field(
        default=None,
        description="Optional media id to pass through to the underlying pipeline call.",
    )
    asset_id: Optional[str]= Field(
        default=None,
        description="Optional asset id to pass through to the underlying pipeline call.",
    )
asset_id pydantic-field
asset_id: Optional[str] = None

Optional asset id to pass through to the underlying pipeline call.

created_at pydantic-field
created_at: float

The epoch time in seconds of when this source was created.

id pydantic-field
id: str

Vermillio's unique id for this pipeline source.

media_id pydantic-field
media_id: Optional[str] = None

Optional media id to pass through to the underlying pipeline call.

source_id pydantic-field
source_id: Optional[str] = None

A (optional) unique id for the source that represents this request in your system.

status pydantic-field
status: PipelineSourceStatus

The status of the source, Pending=Awaiting processing. Running=Currently processing. Succeeded=Finished successully. Errored=Unexpected error occurred, will be retried. Failed=Finished unsuccessfully. Deleted=Marked for deletion.

updated_at pydantic-field
updated_at: Optional[float]

The last epoch time in seconds of when this source was last updated.

PipelineStatus pydantic-model

Bases: BaseModel

Fields:

Source code in packages/core/src/vermillio/sdk/core/models/pipelines.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PipelineStatus(BaseModel):
    pending_count: int = Field(
        description="The number of sources under this context with status == 'Pending'"
    )
    running_count: int = Field(
        description="The number of sources under this context with status == 'Running'"
    )
    succeeded_count: int = Field(
        description="The number of sources under this context with status == 'Succeeded'"
    )
    errored_count: int = Field(
        description="The number of sources under this context with status == 'Errored'"
    )
    failed_count: int = Field(
        description="The number of sources under this context with status == 'Failed'"
    )
errored_count pydantic-field
errored_count: int

The number of sources under this context with status == 'Errored'

failed_count pydantic-field
failed_count: int

The number of sources under this context with status == 'Failed'

pending_count pydantic-field
pending_count: int

The number of sources under this context with status == 'Pending'

running_count pydantic-field
running_count: int

The number of sources under this context with status == 'Running'

succeeded_count pydantic-field
succeeded_count: int

The number of sources under this context with status == 'Succeeded'