Models

Analyses

class onecodex.models.analysis.Analyses(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef], analysis_type: str)

Bases: _AnalysesBase, AnalysisSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json: bool = True)

Fetch the results of an Analyses resource.

Parameters

jsonbool, optional

Return a JSON result (raw API result)? Default True.

Returns

Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]

Query analyses.

An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g. Classifications).

Examples

Find completed analyses since yesterday:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Analyses.where(complete=True, created_at={"$gte": since})

Find failed analyses (terminal but unsuccessful):

ocx.Analyses.where(complete=True, success=False)

Find all analyses for a given sample:

ocx.Analyses.where(sample=sample)

See OneCodexBase.where() for the full operator reference.

Alignments

class onecodex.models.analysis.Alignments(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])

Bases: _AnalysesBase, AlignmentSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json: bool = True)

Fetch the results of an Analyses resource.

Parameters

jsonbool, optional

Return a JSON result (raw API result)? Default True.

Returns

Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]

Query analyses.

An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g. Classifications).

Examples

Find completed analyses since yesterday:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Analyses.where(complete=True, created_at={"$gte": since})

Find failed analyses (terminal but unsuccessful):

ocx.Analyses.where(complete=True, success=False)

Find all analyses for a given sample:

ocx.Analyses.where(sample=sample)

See OneCodexBase.where() for the full operator reference.

Classifications

class onecodex.models.analysis.Classifications(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef], results_uri: str | None = None)

Bases: _AnalysesBase, ClassificationSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json: bool = True) dict | pd.DataFrame

Return the complete results table for a classification.

Parameters

jsonbool, optional

Return result as JSON? Default True.

Returns

tabledict or pd.DataFrame

Return a JSON object with the classification results or a pd.DataFrame if json=False.

table() pd.DataFrame

Return the complete results table for the classification.

Returns

tablepd.DataFrame

A Pandas DataFrame of the classification results.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>) SampleCollection

Query classifications and return a SampleCollection.

Classifications are taxonomic results — typically the One Codex Database run against each sample. Filters mirror those on Analyses.where().

Examples

Find recent successful classifications:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Classifications.where(
    complete=True,
    success=True,
    created_at={"$gte": since},
)

Find the classification for a specific sample:

cls_run = ocx.Classifications.where(sample=sample)[0]

See OneCodexBase.where() for the full operator reference.

FunctionalProfiles

class onecodex.models.analysis.FunctionalProfiles(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])

Bases: _AnalysesBase, FunctionalRunSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

filtered_table(annotation: FunctionalAnnotations, metric: FunctionalAnnotationsMetric, taxa_stratified: bool = True)

Return a results table for the functional analysis.

Parameters

annotationonecodex.lib.enum.FunctionalAnnotation, required

Return a table for a given onecodex.lib.enum.FunctionalAnnotation

metriconecodex.lib.enum.FunctionalAnnotationMetric, required

Return a table for a given onecodex.lib.enum.FunctionalAnnotationMetric

taxa_stratifiedbool, optional

If False, return data only by annotation ID, ignoring taxonomic stratification

Returns

results_dfpd.DataFrame

A Pandas DataFrame of the functional results.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json: bool = True)

Return the complete results table for a functional analysis.

Parameters

jsonbool, optional

Return result as JSON? Default True.

Returns

tabledict or pd.DataFrame

Return a JSON object with the functional analysis results or a pd.DataFrame if json=False.

table(annotation: FunctionalAnnotations | None = None, taxa_stratified: bool = True)

Return a results table for the functional analysis.

Parameters

annotation{None, onecodex.lib.enum.FunctionalAnnotation}, optional

If None, return a table with all annotations, otherwise filter to one of onecodex.lib.enum.FunctionalAnnotation

taxa_stratifiedbool, optional

If False, return data only by annotation ID, ignoring taxonomic stratification

Returns

results_dfpd.DataFrame

A Pandas DataFrame of the functional results.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]

Query analyses.

An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g. Classifications).

Examples

Find completed analyses since yesterday:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Analyses.where(complete=True, created_at={"$gte": since})

Find failed analyses (terminal but unsuccessful):

ocx.Analyses.where(complete=True, success=False)

Find all analyses for a given sample:

ocx.Analyses.where(sample=sample)

See OneCodexBase.where() for the full operator reference.

Panels

class onecodex.models.analysis.Panels(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])

Bases: _AnalysesBase, PanelSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json=True)

Fetch the results of an Analyses resource.

Parameters

jsonbool, optional

Return a JSON result (raw API result)? Default True.

Returns

Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]

Query analyses.

An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g. Classifications).

Examples

Find completed analyses since yesterday:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Analyses.where(complete=True, created_at={"$gte": since})

Find failed analyses (terminal but unsuccessful):

ocx.Analyses.where(complete=True, success=False)

Find all analyses for a given sample:

ocx.Analyses.where(sample=sample)

See OneCodexBase.where() for the full operator reference.

Mlsts

class onecodex.models.analysis.Mlsts(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], updated_at: rfc3339_decoder, json_schema_input_type=PydanticUndefined), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'date-time'})] | None = None, complete: bool = False, error_msg: str | None = None, job: Jobs | ApiRef, job_args: dict[str, Any] = {}, sample: Samples | ApiRef, success: bool | None = False, cost: CostSchema | None, draft: bool, dependencies: list[BaseAnalysisSchema] | list[ApiRef])

Bases: _AnalysesBase, MlstSchema

await_completion(timeout: float | None = None, initial_interval: int = 5, max_interval: int = 120, backoff: float = 1.5) _AnalysesBase

Poll the API until the analysis reaches a terminal state.

An analysis is considered terminal when complete is True. The method refreshes self in place and returns it.

Parameters

timeoutfloat, optional

Maximum number of seconds to wait. None (default) waits indefinitely.

initial_intervalint, optional

Seconds to wait between the first polls. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 5.

max_intervalint, optional

Upper bound on the polling interval as it backs off. Must be at least MIN_POLL_INTERVAL_SECONDS (5). Defaults to 120 seconds.

backofffloat, optional

Multiplier applied to the interval after each poll. Defaults to 1.5.

Returns

self : the analysis, refreshed to its terminal state.

Raises

TimeoutError

If timeout elapses before the analysis completes.

download_file(filepath: str | FileDetailSchema, out_path: str | None = None, out_file_obj: IO | None = None, progressbar: bool = False) str

Download analysis result file.

Parameters

filepath: str or FileDetailSchema

Must be one of objects or filepathes returned by get_files

out_path: string, optional

Full path to save the file to. If omitted, defaults to the original filename in the current working directory.

out_file_obj: file-like object, optional

Rather than save the file to a out_path, write it to this file-like object.

progressbar: bool, optional

Display a progress bar using Click for the download?

Returns

string

The path the file was downloaded to, if applicable. Otherwise, None.

Notes

Existing paths will not be overwritten.

get_files() List[FileDetailSchema]

Fetch the files details of an Analyses.

Returns

A list of FileDetailSchema

logs(tail: int | None = None) str

Fetch the job run logs for this analysis.

Parameters

tailint, optional

If set, return only the last tail lines of the logs. Must be >= 1.

Returns

str

The job run logs as a plain-text string. Empty if the analysis has not produced any logs.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

refresh() None

Fetch the current state from the API and update this object’s state fields in-place.

results(json=True)

Fetch the results of an Analyses resource.

Parameters

jsonbool, optional

Return a JSON result (raw API result)? Default True.

Returns

Return type varies by Analyses resource sub-type. See, e.g., Classifications or Panels for documentation.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, complete: bool | BoolFilter = <UNSET>, draft: bool | BoolFilter = <UNSET>, success: bool | BoolFilter = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, analysis_type: str | StrFilter = <UNSET>, job: Jobs | str | RefFilter = <UNSET>, sample: Samples | str | RefFilter = <UNSET>, **keyword_filters: Any) list[Self]

Query analyses.

An analysis is the result of running a job on a sample. Use this when you want results across analysis types (classifications, panels, functional profiles, etc.); for a single type, prefer the dedicated subclass (e.g. Classifications).

Examples

Find completed analyses since yesterday:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ocx.Analyses.where(complete=True, created_at={"$gte": since})

Find failed analyses (terminal but unsuccessful):

ocx.Analyses.where(complete=True, success=False)

Find all analyses for a given sample:

ocx.Analyses.where(sample=sample)

See OneCodexBase.where() for the full operator reference.

Assets

class onecodex.models.misc.Assets(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], name: str, filename: str, size: int | None = None, status: str, uploader: Users | ApiRef)

Bases: OneCodexBase, AssetSchema, ResourceDownloadMixin

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod upload(file_path, progressbar=None, name=None)

Upload a file to an asset.

Parameters

file_pathstring

A path to a file on the system.

progressbarclick.progressbar, optional

If passed, display a progress bar using Click.

namestring, optional

If passed, name is sent with upload request and is associated with asset.

Returns

An Assets object upon successful upload. None if the upload failed.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, name: str | StrFilter = <UNSET>, filename: str | StrFilter = <UNSET>, size: int | NumFilter | None = <UNSET>, status: str | StrFilter = <UNSET>, uploader: Users | str | RefFilter = <UNSET>) list[Self]

Query assets.

Assets are reusable inputs to custom jobs — reference databases, index files, anything you want available at job run time. Shared with everyone in your organization.

Examples

Find assets by name:

ocx.Assets.where(name={"$icontains": "kraken"})

Find only assets ready to use (status="available"):

ocx.Assets.where(status="available")

See OneCodexBase.where() for the full operator reference.

Documents

class onecodex.models.misc.Documents(*, field_uri: str, created_at: Annotated[datetime, PlainSerializer(func=rfc3339_encoder, return_type=str, when_used=always), BeforeValidator(func=rfc3339_decoder, json_schema_input_type=PydanticUndefined)], filename: str, size: int | None = None, uploader: Users | ApiRef, downloaders: List[Users | ApiRef] = [])

Bases: OneCodexBase, DocumentSchema, ResourceDownloadMixin

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod upload(file_path, progressbar=None)

Upload a series of files to the One Codex server.

Parameters

file_pathstring

A path to a file on the system.

progressbarclick.progressbar, optional

If passed, display a progress bar using Click.

Returns

A Documents object upon successful upload. None if the upload failed.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, filename: str | StrFilter = <UNSET>, size: int | NumFilter | None = <UNSET>, uploader: Users | str | RefFilter = <UNSET>) list[Self]

Query documents.

Documents are arbitrary files (PDFs, spreadsheets, etc.) uploaded to your account, optionally shared with other users.

Examples

Find documents uploaded since a date:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
ocx.Documents.where(created_at={"$gte": since})

Find documents larger than 10 MB:

ocx.Documents.where(size={"$gte": 10 * 1024 * 1024})

Find documents uploaded by a specific user:

ocx.Documents.where(uploader=user)

See OneCodexBase.where() for the full operator reference.

Jobs

class onecodex.models.misc.Jobs(*, field_uri: str, created_at: ~typing.Annotated[~datetime.datetime, ~pydantic.functional_serializers.PlainSerializer(func=~onecodex.models.schemas.types.rfc3339_encoder, return_type=str, when_used=always), ~pydantic.functional_validators.BeforeValidator(func=~onecodex.models.schemas.types.rfc3339_decoder, json_schema_input_type=PydanticUndefined)], name: str, job_args_schema: dict[str, ~typing.Any] = <factory>, analysis_type: str, public: bool, job_type: str | None = None)

Bases: OneCodexBase, JobSchema

details() JobDetails

Fetch the job’s detail fields and return them as a JobDetails.

Includes script, image_uri, cpu, ram_gb, storage_gb, repository, assets, dependencies, and arguments_schema. Only available for user-created jobs the current user has permission to view the details of.

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, created_at: datetime | DatetimeFilter = <UNSET>, name: str | StrFilter = <UNSET>, analysis_type: str | StrFilter = <UNSET>) list[Self]

Query jobs.

Jobs are runnable workflows — both built-in (e.g. classification) and custom (Nextflow pipelines, shell scripts). Use public=True to include One Codex’s built-in jobs and any other public custom jobs in addition to your own.

Examples

Find your custom jobs by name:

ocx.Jobs.where(name={"$icontains": "amplicon"})

Find all classification-type jobs (yours + public):

ocx.Jobs.where(analysis_type="classification", public=True)

See OneCodexBase.where() for the full operator reference.

Projects

class onecodex.models.misc.Projects(*, field_uri: str, description: str | None = None, external_id: str | None = None, name: str | None = None, owner: Users | ApiRef, permissions: list[str], project_name: Annotated[str | None, MinLen(min_length=3), MaxLen(max_length=15), _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]{3,15}$')], public: bool = False)

Bases: OneCodexBase, ProjectSchema

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, name: str | StrFilter | None = <UNSET>, project_name: str | StrFilter | None = <UNSET>, description: str | StrFilter | None = <UNSET>, external_id: str | StrFilter | None = <UNSET>, owner: Users | str | RefFilter | None = <UNSET>) list[Self]

Query projects.

Projects group related samples. Set public=True to search across all public projects on One Codex.

Examples

Find a project by exact name:

proj = ocx.Projects.where(name="HMP Phase II")[0]

Find public projects containing a keyword:

ocx.Projects.where(name={"$icontains": "gut"}, public=True)

See OneCodexBase.where() for the full operator reference.

Samples

class onecodex.models.sample.Samples(*, field_uri: str, created_at: datetime, error_msg: str | None = None, filename: str | None = None, metadata: Metadata | ApiRef, owner: Users | ApiRef, primary_classification: Classifications | ApiRef | None = None, project: Projects | ApiRef | None = None, size: int | None = None, tags: List[Tags | ApiRef] = [], status: str, visibility: str, updated_at: datetime | None = None)

Bases: OneCodexBase, _SampleSchema, ResourceDownloadMixin

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod preupload(metadata=None, tags=None, project=None)

Create a sample in a waiting state where the files will be sent later on.

Parameters

metadata : dict, optional

tagslist, optional

A list of optional tags to create. Tags must be passed as dictionaries with a single key name and the tag name, e.g., {“name”: “my tag”}. New tags will be created on-the-fly as needed.

projectstring, optional

UUID of project to associate this sample with.

save()

Send changes on this Samples object to the One Codex server.

Changes to the metadata object and tags list are passed as well.

classmethod upload(files, metadata=None, tags=None, project=None, coerce_ascii=False, progressbar=None, sample_id=None, external_sample_id=None)

Upload a series of files to the One Codex server.

Parameters

filesstring or tuple

A single path to a file on the system, or a tuple containing a pairs of paths. Tuple values will be interleaved as paired-end reads and both files should contain the same number of records. Paths to single files will be uploaded as-is.

metadata : dict, optional

tagslist, optional

A list of optional tags to create. Tags must be passed as dictionaries with a single key name and the tag name, e.g., {“name”: “my tag”}. New tags will be created on-the-fly as needed.

projectstring, optional

UUID of project to associate this sample with.

coerce_asciibool, optional

If true, rename unicode filenames to ASCII and issue warning.

progressbarclick.progressbar, optional

If passed, display a progress bar using Click.

sample_idstring, optional

If passed, will upload the file(s) to the sample with that id. Only works if the sample was pre-uploaded

external_sample_idstring, optional

If passed, will upload the file(s) to the sample with that metadata external id. Only works if the sample was pre-uploaded

Returns

A Samples object upon successful upload. None if the upload failed.

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, organization: bool = False, filter: Any = None, tags: list | None = None, created_at: datetime | DatetimeFilter = <UNSET>, updated_at: datetime | DatetimeFilter = <UNSET>, filename: str | StrFilter | None = <UNSET>, error_msg: str | StrFilter | None = <UNSET>, size: int | NumFilter | None = <UNSET>, status: str | StrFilter = <UNSET>, visibility: str | StrFilter = <UNSET>, metadata: Metadata | str | RefFilter = <UNSET>, owner: Users | str | RefFilter = <UNSET>, project: Projects | str | RefFilter | None = <UNSET>, **keyword_filters: Any) SampleCollection

Query samples and return a SampleCollection.

Samples are the uploaded sequencing files (FASTA/FASTQ). Filter by any sample field, by metadata field (transparently joined), or by tags.

Examples

Find your recent FASTQ uploads:

from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
ocx.Samples.where(
    created_at={"$gte": since},
    filename={"$iendswith": ".fastq.gz"},
)

Filter by tag (resolved by name, id, or Tags instance):

ocx.Samples.where(tags=["trimmed", "human-depleted"])

Filter by a metadata field — transparently joined:

ocx.Samples.where(platform="Illumina NovaSeq 6000")

Search public or organization-shared samples:

ocx.Samples.where(filename={"$icontains": "hmp"}, public=True)
ocx.Samples.where(organization=True)

Apply a client-side predicate after fetching:

ocx.Samples.where(filter=lambda s: not s.tags)

Parameters

organization

Search samples shared across your organization. Mutually exclusive with public.

public

Search public samples (capped at 1000 results). Mutually exclusive with organization.

tags

Tags to filter by. Accepts Tags instances, tag ids, or tag names — all resolved to refs and combined with $containsall.

Returns

SampleCollection

Notes

Metadata custom fields aren’t server-filterable — filter client-side via SampleCollection.filter().

See OneCodexBase.where() for the full operator reference.

Tags

class onecodex.models.misc.Tags(*, field_uri: str | None = None, name: Annotated[str, MaxLen(max_length=30)])

Bases: OneCodexBase, TagSchema

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, name: str | StrFilter = <UNSET>) list[Self]

Query tags.

Tags are short labels attached to samples for organizing and filtering them.

Examples

Find a tag by exact name:

tag = ocx.Tags.where(name="trimmed")[0]

Find tags containing a substring:

ocx.Tags.where(name={"$icontains": "qc"})

See OneCodexBase.where() for the full operator reference.

Users

class onecodex.models.misc.Users(*, field_uri: str, email: str | None)

Bases: OneCodexBase, UserSchema

model_config = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'populate_by_alias': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod where(*filters: str | dict, sort: str | list[str] | None = None, limit: int | None = None, public: bool = False, filter: Any = None, email: str | EqStrFilter | None = <UNSET>) list[Self]

Query users.

Users are account holders in your organization. The only filterable field is email, and the API only supports exact-match ($eq) — substring operators are not accepted server-side.

Example

Find a user by exact email:

user = ocx.Users.where(email="alice@example.com")[0]

See OneCodexBase.where() for the full operator reference.