Models
Analyses
- class onecodex.models.analysis.Analyses(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef], analysis_type: str)
Bases:
_AnalysesBase,AnalysisSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json: bool = True)
Fetch the results of an Analyses resource.
Parameters
- jsonbool, optional
Return a JSON result (raw API result)? Default True.
Returns
Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]
Query analyses.
An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g.
Classifications).Examples
Find completed analyses since yesterday:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Analyses.where(complete=True, created_at={"$gte": since})
Find failed analyses (terminal but unsuccessful):
ocx.Analyses.where(complete=True, success=False)
Find all analyses for a given sample:
ocx.Analyses.where(sample=sample)
See
OneCodexBase.where()for the full operator reference.
Alignments
- class onecodex.models.analysis.Alignments(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])
Bases:
_AnalysesBase,AlignmentSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json: bool = True)
Fetch the results of an Analyses resource.
Parameters
- jsonbool, optional
Return a JSON result (raw API result)? Default True.
Returns
Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]
Query analyses.
An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g.
Classifications).Examples
Find completed analyses since yesterday:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Analyses.where(complete=True, created_at={"$gte": since})
Find failed analyses (terminal but unsuccessful):
ocx.Analyses.where(complete=True, success=False)
Find all analyses for a given sample:
ocx.Analyses.where(sample=sample)
See
OneCodexBase.where()for the full operator reference.
Classifications
- class onecodex.models.analysis.Classifications(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef], results_uri: str | None = None)
Bases:
_AnalysesBase,ClassificationSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json: bool = True) dict | pd.DataFrame
Return the complete results table for a classification.
Parameters
- jsonbool, optional
Return result as JSON? Default True.
Returns
- tabledict or pd.DataFrame
Return a JSON object with the classification results or a pd.DataFrame if json=False.
- table() pd.DataFrame
Return the complete results table for the classification.
Returns
- tablepd.DataFrame
A Pandas DataFrame of the classification results.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>) SampleCollection
Query classifications and return a
SampleCollection.Classifications are taxonomic results — typically the One Codex Database run against each sample. Filters mirror those on
Analyses.where().Examples
Find recent successful classifications:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Classifications.where( complete=True, success=True, created_at={"$gte": since}, )
Find the classification for a specific sample:
cls_run = ocx.Classifications.where(sample=sample)[0]
See
OneCodexBase.where()for the full operator reference.
FunctionalProfiles
- class onecodex.models.analysis.FunctionalProfiles(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])
Bases:
_AnalysesBase,FunctionalRunSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- filtered_table(annotation: FunctionalAnnotations, metric: FunctionalAnnotationsMetric, taxa_stratified: bool = True)
Return a results table for the functional analysis.
Parameters
- annotationonecodex.lib.enum.FunctionalAnnotation, required
Return a table for a given onecodex.lib.enum.FunctionalAnnotation
- metriconecodex.lib.enum.FunctionalAnnotationMetric, required
Return a table for a given onecodex.lib.enum.FunctionalAnnotationMetric
- taxa_stratifiedbool, optional
If False, return data only by annotation ID, ignoring taxonomic stratification
Returns
- results_dfpd.DataFrame
A Pandas DataFrame of the functional results.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json: bool = True)
Return the complete results table for a functional analysis.
Parameters
- jsonbool, optional
Return result as JSON? Default True.
Returns
- tabledict or pd.DataFrame
Return a JSON object with the functional analysis results or a pd.DataFrame if json=False.
- table(annotation: FunctionalAnnotations | None = None, taxa_stratified: bool = True)
Return a results table for the functional analysis.
Parameters
- annotation{None, onecodex.lib.enum.FunctionalAnnotation}, optional
If None, return a table with all annotations, otherwise filter to one of onecodex.lib.enum.FunctionalAnnotation
- taxa_stratifiedbool, optional
If False, return data only by annotation ID, ignoring taxonomic stratification
Returns
- results_dfpd.DataFrame
A Pandas DataFrame of the functional results.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]
Query analyses.
An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g.
Classifications).Examples
Find completed analyses since yesterday:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Analyses.where(complete=True, created_at={"$gte": since})
Find failed analyses (terminal but unsuccessful):
ocx.Analyses.where(complete=True, success=False)
Find all analyses for a given sample:
ocx.Analyses.where(sample=sample)
See
OneCodexBase.where()for the full operator reference.
Panels
- class onecodex.models.analysis.Panels(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])
Bases:
_AnalysesBase,PanelSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json=True)
Fetch the results of an Analyses resource.
Parameters
- jsonbool, optional
Return a JSON result (raw API result)? Default True.
Returns
Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]
Query analyses.
An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g.
Classifications).Examples
Find completed analyses since yesterday:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Analyses.where(complete=True, created_at={"$gte": since})
Find failed analyses (terminal but unsuccessful):
ocx.Analyses.where(complete=True, success=False)
Find all analyses for a given sample:
ocx.Analyses.where(sample=sample)
See
OneCodexBase.where()for the full operator reference.
Mlsts
- class onecodex.models.analysis.Mlsts(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])
Bases:
_AnalysesBase,MlstSchema- await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase
Poll the API until the analysis reaches a terminal state.
An analysis is considered terminal when
completeis True. The method refreshesselfin place and returns it.Parameters
- timeoutfloat, optional
Maximum number of seconds to wait.
None(default) waits indefinitely.- initial_intervalint, optional
Seconds to wait between the first polls. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 5.- max_intervalint, optional
Upper bound on the polling interval as it backs off. Must be at least
MIN_POLL_INTERVAL_SECONDS(5). Defaults to 120 seconds.- backofffloat, optional
Multiplier applied to the interval after each poll. Defaults to 1.5.
Returns
self : the analysis, refreshed to its terminal state.
Raises
- TimeoutError
If
timeoutelapses before the analysis completes.
- download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str
Download analysis result file.
Parameters
- filepath: str or FileDetailSchema
Must be one of objects or filepathes returned by get_files
- out_path: string, optional
Full path to save the file to. If omitted, defaults to the original filename in the current working directory.
- out_file_obj: file-like object, optional
Rather than save the file to a out_path, write it to this file-like object.
- progressbar: bool, optional
Display a progress bar using Click for the download?
Returns
- string
The path the file was downloaded to, if applicable. Otherwise, None.
Notes
Existing paths will not be overwritten.
- get_files() List[FileDetailSchema]
Fetch the files details of an Analyses.
Returns
A list of FileDetailSchema
- logs(tail: int | None = None) str
Fetch the job run logs for this analysis.
Parameters
- tailint, optional
If set, return only the last
taillines of the logs. Must be >= 1.
Returns
- str
The job run logs as a plain-text string. Empty if the analysis has not produced any logs.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- refresh() None
Fetch the current state from the API and update this object’s state fields in-place.
- results(json=True)
Fetch the results of an Analyses resource.
Parameters
- jsonbool, optional
Return a JSON result (raw API result)? Default True.
Returns
Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]
Query analyses.
An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g.
Classifications).Examples
Find completed analyses since yesterday:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ocx.Analyses.where(complete=True, created_at={"$gte": since})
Find failed analyses (terminal but unsuccessful):
ocx.Analyses.where(complete=True, success=False)
Find all analyses for a given sample:
ocx.Analyses.where(sample=sample)
See
OneCodexBase.where()for the full operator reference.
Assets
- class onecodex.models.misc.Assets(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], name: str, filename: str, size: int | None = None, status: str, uploader: Users | ApiRef)
Bases:
OneCodexBase,AssetSchema,ResourceDownloadMixin- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod upload(file_path, progressbar=None, name=None)
Upload a file to an asset.
Parameters
- file_pathstring
A path to a file on the system.
- progressbarclick.progressbar, optional
If passed, display a progress bar using Click.
- namestring, optional
If passed, name is sent with upload request and is associated with asset.
Returns
An Assets object upon successful upload. None if the upload failed.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, name: str | StrFilter = <UNSET>, filename: str | StrFilter = <UNSET>, size: int | NumFilter | None = <UNSET>, status: str | StrFilter = <UNSET>, uploader: Users | str | RefFilter = <UNSET>) list[Self]
Query assets.
Assets are reusable inputs to custom jobs — reference databases, index files, anything you want available at job run time. Shared with everyone in your organization.
Examples
Find assets by name:
ocx.Assets.where(name={"$icontains": "kraken"})
Find only assets ready to use (
status="available"):ocx.Assets.where(status="available")
See
OneCodexBase.where()for the full operator reference.
Documents
- class onecodex.models.misc.Documents(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], filename: str, size: int | None = None, uploader: Users | ApiRef, downloaders: List[Users | ApiRef] = [])
Bases:
OneCodexBase,DocumentSchema,ResourceDownloadMixin- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod upload(file_path, progressbar=None)
Upload a series of files to the One Codex server.
Parameters
- file_pathstring
A path to a file on the system.
- progressbarclick.progressbar, optional
If passed, display a progress bar using Click.
Returns
A Documents object upon successful upload. None if the upload failed.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, filename: str | StrFilter = <UNSET>, size: int | NumFilter | None = <UNSET>, uploader: Users | str | RefFilter = <UNSET>) list[Self]
Query documents.
Documents are arbitrary files (PDFs, spreadsheets, etc.) uploaded to your account, optionally shared with other users.
Examples
Find documents uploaded since a date:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() ocx.Documents.where(created_at={"$gte": since})
Find documents larger than 10 MB:
ocx.Documents.where(size={"$gte": 10 * 1024 * 1024})
Find documents uploaded by a specific user:
ocx.Documents.where(uploader=user)
See
OneCodexBase.where()for the full operator reference.
Jobs
- class onecodex.models.misc.Jobs(*, field_uri: str, created_at: ~typing.Annotated[~datetime.datetime, ~pydantic.functional_serializers.PlainSerializer(func=~onecodex.models.schemas.types.rfc3339_encoder, return_type=str, when_used=always), ~pydantic.functional_validators.BeforeValidator(func=~onecodex.models.schemas.types.rfc3339_decoder, json_schema_input_type=PydanticUndefined)], name: str, job_args_schema: dict[str, ~typing.Any] = <factory>, analysis_type: str, public: bool, job_type: str | None = None)
Bases:
OneCodexBase,JobSchema- details() JobDetails
Fetch the job’s detail fields and return them as a JobDetails.
Includes script, image_uri, cpu, ram_gb, storage_gb, repository, assets, dependencies, and arguments_schema. Only available for user-created jobs the current user has permission to view the details of.
- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, name: str | StrFilter = <UNSET>, analysis_type: str | StrFilter = <UNSET>) list[Self]
Query jobs.
Jobs are runnable workflows — both built-in (e.g.
classification) and custom (Nextflow pipelines, shell scripts). Usepublic=Trueto include One Codex’s built-in jobs and any other public custom jobs in addition to your own.Examples
Find your custom jobs by name:
ocx.Jobs.where(name={"$icontains": "amplicon"})
Find all classification-type jobs (yours + public):
ocx.Jobs.where(analysis_type="classification", public=True)
See
OneCodexBase.where()for the full operator reference.
Projects
- class onecodex.models.misc.Projects(*, field_uri: str, description: str | None = None, external_id: str | None = None, name: str | None = None, owner: Users | ApiRef, permissions: list[str], project_name: Annotated[str | None, MinLen(min_length=3), MaxLen(max_length=15), _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]{3,15}$')], public: bool = False)
Bases:
OneCodexBase,ProjectSchema- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, name: str | StrFilter | None = <UNSET>, project_name: str | StrFilter | None = <UNSET>, description: str | StrFilter | None = <UNSET>, external_id: str | StrFilter | None = <UNSET>, owner: Users | str | RefFilter | None = <UNSET>) list[Self]
Query projects.
Projects group related samples. Set
public=Trueto search across all public projects on One Codex.Examples
Find a project by exact name:
proj = ocx.Projects.where(name="HMP Phase II")[0]
Find public projects containing a keyword:
ocx.Projects.where(name={"$icontains": "gut"}, public=True)
See
OneCodexBase.where()for the full operator reference.
Samples
- class onecodex.models.sample.Samples(*, field_uri: str, created_at: datetime, error_msg: str | None = None, filename: str | None = None, metadata: Metadata | ApiRef, owner: Users | ApiRef, primary_classification: Classifications | ApiRef | None = None, project: Projects | ApiRef | None = None, size: int | None = None, tags: List[Tags | ApiRef] = [], status: str, visibility: str, updated_at: datetime | None = None)
Bases:
OneCodexBase,_SampleSchema,ResourceDownloadMixin- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod preupload(metadata=None, tags=None, project=None)
Create a sample in a waiting state where the files will be sent later on.
Parameters
metadata : dict, optional
- tagslist, optional
A list of optional tags to create. Tags must be passed as dictionaries with a single key name and the tag name, e.g., {“name”: “my tag”}. New tags will be created on-the-fly as needed.
- projectstring, optional
UUID of project to associate this sample with.
- save()
Send changes on this Samples object to the One Codex server.
Changes to the metadata object and tags list are passed as well.
- classmethod upload(files, metadata=None, tags=None, project=None, coerce_ascii=False, progressbar=None, sample_id=None, external_sample_id=None)
Upload a series of files to the One Codex server.
Parameters
- filesstring or tuple
A single path to a file on the system, or a tuple containing a pairs of paths. Tuple values will be interleaved as paired-end reads and both files should contain the same number of records. Paths to single files will be uploaded as-is.
metadata : dict, optional
- tagslist, optional
A list of optional tags to create. Tags must be passed as dictionaries with a single key name and the tag name, e.g., {“name”: “my tag”}. New tags will be created on-the-fly as needed.
- projectstring, optional
UUID of project to associate this sample with.
- coerce_asciibool, optional
If true, rename unicode filenames to ASCII and issue warning.
- progressbarclick.progressbar, optional
If passed, display a progress bar using Click.
- sample_idstring, optional
If passed, will upload the file(s) to the sample with that id. Only works if the sample was pre-uploaded
- external_sample_idstring, optional
If passed, will upload the file(s) to the sample with that metadata external id. Only works if the sample was pre-uploaded
Returns
A Samples object upon successful upload. None if the upload failed.
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, organization: bool = False, filter: Any = None, tags: list | None = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, filename: str | StrFilter | None = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, size: int | NumFilter | None = <UNSET>, status: str | StrFilter = <UNSET>, visibility: str | StrFilter = <UNSET>, metadata: Metadata | str | RefFilter = <UNSET>, owner: Users | str | RefFilter = <UNSET>, project: Projects | str | RefFilter | None = <UNSET>, **keyword_filters: Any) SampleCollection
Query samples and return a
SampleCollection.Samples are the uploaded sequencing files (FASTA/FASTQ). Filter by any sample field, by metadata field (transparently joined), or by tags.
Examples
Find your recent FASTQ uploads:
from datetime import datetime, timedelta, timezone since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() ocx.Samples.where( created_at={"$gte": since}, filename={"$iendswith": ".fastq.gz"}, )
Filter by tag (resolved by name, id, or
Tagsinstance):ocx.Samples.where(tags=["trimmed", "human-depleted"])
Filter by a metadata field — transparently joined:
ocx.Samples.where(platform="Illumina NovaSeq 6000")
Search public or organization-shared samples:
ocx.Samples.where(filename={"$icontains": "hmp"}, public=True) ocx.Samples.where(organization=True)
Apply a client-side predicate after fetching:
ocx.Samples.where(filter=lambda s: not s.tags)
Parameters
- organization
Search samples shared across your organization. Mutually exclusive with
public.- public
Search public samples (capped at 1000 results). Mutually exclusive with
organization.- tags
Tags to filter by. Accepts
Tagsinstances, tag ids, or tag names — all resolved to refs and combined with$containsall.
Returns
SampleCollectionNotes
Metadata
customfields aren’t server-filterable — filter client-side viaSampleCollection.filter().See
OneCodexBase.where()for the full operator reference.
Users
- class onecodex.models.misc.Users(*, field_uri: str, email: str | None)
Bases:
OneCodexBase,UserSchema- model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, email: str | EqStrFilter | None = <UNSET>) list[Self]
Query users.
Users are account holders in your organization. The only filterable field is
email, and the API only supports exact-match ($eq) — substring operators are not accepted server-side.Example
Find a user by exact email:
user = ocx.Users.where(email="alice@example.com")[0]
See
OneCodexBase.where()for the full operator reference.