Production-ready · Stable v1

Kubernetes operators in Python

Write event-driven or state-driven handler functions with simple decorators. No boilerplate, no infrastructure glue code. Just your domain logic.

handlers.py
import kopf

@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
    print(f"Created {name} with spec: {spec}")

@kopf.on.update('kopfexamples')
def update_fn(old, new, diff, **_):
    print(f"Updated with changes: {diff}")

@kopf.on.delete('kopfexamples')
def delete_fn(name, **_):
    print(f"Deleted {name}. Goodbye!")
8 lines
for a working operator
Python 3.10+
CPython & PyPy
MIT
open-source license
Stable v1
production-ready

Everything you need to build operators

From simple event handlers to daemons, webhooks, and multi-operator peering — all with clean, Pythonic APIs.

Declarative handlers

Register handlers with decorators: @kopf.on.create, @kopf.on.update, @kopf.on.delete. Focus on your domain logic, not K8s plumbing.

Daemons & timers

Run long-lived background tasks per resource with @kopf.daemon or periodic logic with @kopf.timer. Threads or async — your choice.

Admission webhooks

Validate and mutate resources with @kopf.on.validate and @kopf.on.mutate. Built-in dev-mode tunneling for local development.

In-memory indexing

Maintain live indices of resources with @kopf.index. Cross-resource awareness without extra API calls.

Eventual consistency

Automatic retries with configurable limits. Progress persists across operator restarts. Handles changes after lengthy downtimes.

Multi-operator peering

Run multiple operators for the same resources without conflicts. Cross-pod awareness prevents double-processing.

Smart filtering

Filter by labels, annotations, fields, or arbitrary callbacks. Stealth mode suppresses logging for filtered-out resources.

Sync & async

Write handlers as plain functions or async coroutines. Sync handlers run in threads automatically. Mix both styles freely.

Testing toolkit

Built-in KopfRunner for in-memory operator testing. Run your operator in tests without a cluster.

See it in action

From a minimal handler to daemons and webhooks — everything stays readable and Pythonic.

handlers.py
import kopf

@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
    print(f"Created {name} with spec: {spec}")
    return {'message': f'Resource {name} created successfully'}

@kopf.on.update('kopfexamples')
def update_fn(old, new, diff, **_):
    for op, field, old_val, new_val in diff:
        print(f"{field}: {old_val} -> {new_val}")

@kopf.on.delete('kopfexamples')
def delete_fn(name, logger, **_):
    logger.info(f"Cleaning up {name}...")  # Also a K8s event

High-level cause detection: Kopf translates raw K8s watch events into meaningful causes like create, update, and delete. Handler return values are persisted to the resource status.

daemons.py
import time
import kopf

# A background thread that runs while the resource exists
@kopf.daemon('kopfexamples')
def monitor(spec, stopped, **_):
    while not stopped:
        print(f"Monitoring: {spec}")
        time.sleep(10)

# Or use a timer for periodic checks
@kopf.timer('kopfexamples', interval=60, idle=600)
def reconcile(spec, **_):
    print(f"Periodic reconciliation: {spec}")

# Async works too
@kopf.daemon('kopfexamples')
async def async_monitor(spec, stopped, **_):
    while not stopped:
        await asyncio.sleep(5)

Daemons start when a resource appears and stop when it is deleted. Timers run periodically with configurable intervals and idle delays. Both support sync and async styles.

webhooks.py
import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
    settings.admission.server = kopf.WebhookAutoServer()
    settings.admission.managed = 'auto.kopf.dev'

@kopf.on.validate('kopfexamples')
def validate(spec, **_):
    if spec.get('size') and spec['size'] > 100:
        raise kopf.AdmissionError("Size must be <= 100")

@kopf.on.mutate('kopfexamples')
def set_defaults(patch: kopf.Patch, **_):
    patch.spec['managed'] = True
    patch.spec.setdefault('replicas', 1)

Validating and mutating admission webhooks with automatic webhook configuration management. Built-in dev-mode tunneling lets you develop webhooks locally.

errors.py
import kopf

@kopf.on.create('kopfexamples', retries=5, timeout=300)
def create_fn(spec, retry, **_):
    # Temporary failures retry after a delay
    if not is_dependency_ready(spec):
        raise kopf.TemporaryError("Waiting for dependency", delay=10)

    # Permanent failures never retry
    if spec.get('field') == 'invalid':
        raise kopf.PermanentError("Invalid configuration")

    # Arbitrary exceptions also retry (with backoff)
    result = provision_resource(spec)
    return {'provisioned': result.id}

Fine-grained error handling with temporary and permanent errors. Retry counts, timeouts, and backoff are all configurable. Progress is persisted and survives operator restarts.

test_operator.py
import subprocess
import time
from kopf.testing import KopfRunner

def test_operator_handles_creation():
    with KopfRunner(['run', '-A', '--verbose', 'handlers.py']) as runner:
        subprocess.run("kubectl apply -f obj.yaml",
                       shell=True, check=True)
        time.sleep(1)

    assert runner.exit_code == 0
    assert runner.exception is None
    assert 'Created' in runner.output

The built-in testing toolkit runs your operator in-process for integration tests. Combine with kubectl commands to verify end-to-end behavior.

How Kopf works

Kopf watches Kubernetes resources and translates raw watch events into high-level causes for your handlers.

1

Watch

Kopf opens watch streams to the Kubernetes API for the resource types you register handlers for.

2

Detect causes

Raw events are analyzed to determine what actually changed: creation, update with diffs, deletion, or resumption after restart.

3

Run handlers

Matching handlers are invoked with rich kwargs: spec, diff, old/new, logger, and more. Results are persisted.

Works with any
K8s resource

Custom resources, built-in resources, multiple types in one operator. Both cluster-scoped and namespaced.

  • Custom Resource Definitions (CRDs)
  • Built-in resources: Pods, Namespaces, ConfigMaps, etc.
  • Multiple resource types in a single operator
  • Client agnostic: works with any K8s client library
  • Embeddable into existing Python applications
multi.py
import kopf

# Custom resources
@kopf.on.create('kopfexamples')
def on_example_create(spec, **_):
    return {'ready': True}

# Built-in resources
@kopf.on.create('', 'v1', 'pods')
def on_pod_create(name, namespace, **_):
    print(f"New pod: {namespace}/{name}")

# Namespaced filtering
@kopf.on.update('apps', 'v1', 'deployments',
                labels={'managed-by': 'kopf'})
def on_deploy_update(diff, **_):
    for op, field, old, new in diff:
        print(f"{field}: {old} -> {new}")

Get started in minutes

From install to a running operator in three steps.

1

Install Kopf

pip install kopf
2

Write your handlers

import kopf

@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
    print(f"Created {name} with spec: {spec}")
3

Run it

kopf run handlers.py --verbose

For production, package it in a Docker image:

FROM python:3.14
ADD . /src
RUN pip install kopf
CMD kopf run /src/handlers.py --verbose

Ready to build your operator?

Kopf handles the infrastructure so you can focus on what matters: your domain logic.