Skip to content
Navigation

Persistent knowledge across sessions with async LLM extraction.

Module Path

python
from orbiter.memory.long_term import (
    LongTermMemory,
    ExtractionType,
    Extractor,
    ExtractionTask,
    TaskStatus,
    MemoryOrchestrator,
    OrchestratorConfig,
)

ExtractionType

Types of knowledge that can be extracted from conversations.

python
class ExtractionType(StrEnum):
    USER_PROFILE = "user_profile"
    AGENT_EXPERIENCE = "agent_experience"
    FACTS = "facts"

TaskStatus

Lifecycle states for extraction tasks.

python
class TaskStatus(StrEnum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

Extractor (Protocol)

Protocol for LLM-powered knowledge extraction.

python
@runtime_checkable
class Extractor(Protocol):
    async def extract(self, prompt: str) -> str: ...

Implement this protocol by wrapping your LLM provider.


ExtractionTask

A single knowledge extraction task.

Decorator: @dataclass(slots=True)

Fields

FieldTypeDefaultDescription
extraction_typeExtractionType(required)What kind of knowledge to extract
source_itemslist[MemoryItem](required)Memory items to extract from
task_idstrAuto UUIDUnique identifier
statusTaskStatusPENDINGCurrent lifecycle status
resultstr | NoneNoneExtracted text (set on completion)
errorstr | NoneNoneError message (set on failure)
created_atstrISO-8601 nowCreation timestamp
completed_atstr | NoneNoneCompletion timestamp

Methods

MethodDescription
start()Mark task as RUNNING
complete(result)Mark as COMPLETED with result text
fail(error)Mark as FAILED with error message

LongTermMemory

Persistent memory store for knowledge that spans sessions. Implements the MemoryStore protocol with content-based deduplication.

Constructor

python
LongTermMemory(*, namespace: str = "default")
ParameterTypeDefaultDescription
namespacestr"default"Isolation namespace for multi-tenant usage

Attributes

AttributeTypeDescription
namespacestrThe isolation namespace

Methods (MemoryStore Protocol)

add()

python
async def add(self, item: MemoryItem) -> None

Persist a memory item, deduplicating by content. Items with identical content and memory_type are silently skipped.

get()

python
async def get(self, item_id: str) -> MemoryItem | None

Retrieve by ID.

python
async def search(
    self,
    *,
    query: str = "",
    metadata: MemoryMetadata | None = None,
    memory_type: str | None = None,
    status: MemoryStatus | None = None,
    limit: int = 10,
) -> list[MemoryItem]

Search with filters. Results sorted by creation time (newest first).

clear()

python
async def clear(*, metadata: MemoryMetadata | None = None) -> int

Remove items matching filter.


OrchestratorConfig

Configuration for the memory orchestrator.

Decorator: @dataclass(frozen=True, slots=True)

Constructor

python
OrchestratorConfig(
    extraction_types: tuple[ExtractionType, ...] = (
        ExtractionType.USER_PROFILE,
        ExtractionType.AGENT_EXPERIENCE,
        ExtractionType.FACTS,
    ),
    prompts: dict[ExtractionType, str] = {},
    min_items: int = 3,
)
FieldTypeDefaultDescription
extraction_typestuple[ExtractionType, ...]All three typesWhich knowledge types to extract
promptsdict[ExtractionType, str]{}Custom prompt overrides per type
min_itemsint3Minimum items to trigger extraction

Methods

get_prompt()

python
def get_prompt(self, extraction_type: ExtractionType) -> str

Get the prompt for an extraction type, falling back to built-in defaults.


MemoryOrchestrator

Orchestrates async LLM extraction of structured knowledge from conversations.

Constructor

python
MemoryOrchestrator(
    store: LongTermMemory,
    *,
    config: OrchestratorConfig | None = None,
)
ParameterTypeDefaultDescription
storeLongTermMemory(required)Long-term memory store for results
configOrchestratorConfig | NoneNoneConfiguration (defaults to OrchestratorConfig())

Attributes

AttributeTypeDescription
storeLongTermMemoryThe backing store
configOrchestratorConfigOrchestrator configuration

Methods

submit()

python
def submit(
    self,
    items: Sequence[MemoryItem],
    *,
    extraction_type: ExtractionType | None = None,
    metadata: MemoryMetadata | None = None,
) -> list[ExtractionTask]

Submit extraction tasks. Creates one task per configured extraction type (or a single task if extraction_type is specified).

process()

python
async def process(
    self,
    task_id: str,
    extractor: Any,
    *,
    metadata: MemoryMetadata | None = None,
) -> ExtractionTask

Process a single extraction task. Runs the extractor, stores the result, updates the task status.

Raises: KeyError if no task with the given ID exists.

process_all()

python
async def process_all(
    self,
    extractor: Any,
    *,
    metadata: MemoryMetadata | None = None,
) -> list[ExtractionTask]

Process all pending tasks.

get_task()

python
def get_task(self, task_id: str) -> ExtractionTask | None

list_tasks()

python
def list_tasks(self, *, status: TaskStatus | None = None) -> list[ExtractionTask]

Example

python
import asyncio
from orbiter.memory import (
    LongTermMemory, MemoryOrchestrator, OrchestratorConfig,
    ExtractionType, HumanMemory, AIMemory,
)

class MyExtractor:
    async def extract(self, prompt: str) -> str:
        return "Extracted: user prefers Python"

async def main():
    store = LongTermMemory(namespace="user-1")
    orchestrator = MemoryOrchestrator(
        store,
        config=OrchestratorConfig(
            extraction_types=(ExtractionType.USER_PROFILE,),
        ),
    )

    # Submit items for extraction
    items = [
        HumanMemory(content="I love Python and data science"),
        AIMemory(content="Great! Python is excellent for data science."),
    ]
    tasks = orchestrator.submit(items)

    # Process with extractor
    results = await orchestrator.process_all(MyExtractor())
    for task in results:
        print(f"Status: {task.status}, Result: {task.result}")

    # Check long-term memory
    stored = await store.search(memory_type="user_profile")
    print(stored)

asyncio.run(main())