Write event-driven or state-driven handler functions with simple decorators. No boilerplate, no infrastructure glue code. Just your domain logic.
import kopf
@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
print(f"Created {name} with spec: {spec}")
@kopf.on.update('kopfexamples')
def update_fn(old, new, diff, **_):
print(f"Updated with changes: {diff}")
@kopf.on.delete('kopfexamples')
def delete_fn(name, **_):
print(f"Deleted {name}. Goodbye!")
From simple event handlers to daemons, webhooks, and multi-operator peering — all with clean, Pythonic APIs.
Register handlers with decorators: @kopf.on.create, @kopf.on.update, @kopf.on.delete. Focus on your domain logic, not K8s plumbing.
Run long-lived background tasks per resource with @kopf.daemon or periodic logic with @kopf.timer. Threads or async — your choice.
Validate and mutate resources with @kopf.on.validate and @kopf.on.mutate. Built-in dev-mode tunneling for local development.
Maintain live indices of resources with @kopf.index. Cross-resource awareness without extra API calls.
Automatic retries with configurable limits. Progress persists across operator restarts. Handles changes after lengthy downtimes.
Run multiple operators for the same resources without conflicts. Cross-pod awareness prevents double-processing.
Filter by labels, annotations, fields, or arbitrary callbacks. Stealth mode suppresses logging for filtered-out resources.
Write handlers as plain functions or async coroutines. Sync handlers run in threads automatically. Mix both styles freely.
Built-in KopfRunner for in-memory operator testing. Run your operator in tests without a cluster.
From a minimal handler to daemons and webhooks — everything stays readable and Pythonic.
import kopf
@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
print(f"Created {name} with spec: {spec}")
return {'message': f'Resource {name} created successfully'}
@kopf.on.update('kopfexamples')
def update_fn(old, new, diff, **_):
for op, field, old_val, new_val in diff:
print(f"{field}: {old_val} -> {new_val}")
@kopf.on.delete('kopfexamples')
def delete_fn(name, logger, **_):
logger.info(f"Cleaning up {name}...") # Also a K8s event
High-level cause detection: Kopf translates raw K8s watch events into meaningful causes like create, update, and delete. Handler return values are persisted to the resource status.
import time
import kopf
# A background thread that runs while the resource exists
@kopf.daemon('kopfexamples')
def monitor(spec, stopped, **_):
while not stopped:
print(f"Monitoring: {spec}")
time.sleep(10)
# Or use a timer for periodic checks
@kopf.timer('kopfexamples', interval=60, idle=600)
def reconcile(spec, **_):
print(f"Periodic reconciliation: {spec}")
# Async works too
@kopf.daemon('kopfexamples')
async def async_monitor(spec, stopped, **_):
while not stopped:
await asyncio.sleep(5)
Daemons start when a resource appears and stop when it is deleted. Timers run periodically with configurable intervals and idle delays. Both support sync and async styles.
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.admission.server = kopf.WebhookAutoServer()
settings.admission.managed = 'auto.kopf.dev'
@kopf.on.validate('kopfexamples')
def validate(spec, **_):
if spec.get('size') and spec['size'] > 100:
raise kopf.AdmissionError("Size must be <= 100")
@kopf.on.mutate('kopfexamples')
def set_defaults(patch: kopf.Patch, **_):
patch.spec['managed'] = True
patch.spec.setdefault('replicas', 1)
Validating and mutating admission webhooks with automatic webhook configuration management. Built-in dev-mode tunneling lets you develop webhooks locally.
import kopf
@kopf.on.create('kopfexamples', retries=5, timeout=300)
def create_fn(spec, retry, **_):
# Temporary failures retry after a delay
if not is_dependency_ready(spec):
raise kopf.TemporaryError("Waiting for dependency", delay=10)
# Permanent failures never retry
if spec.get('field') == 'invalid':
raise kopf.PermanentError("Invalid configuration")
# Arbitrary exceptions also retry (with backoff)
result = provision_resource(spec)
return {'provisioned': result.id}
Fine-grained error handling with temporary and permanent errors. Retry counts, timeouts, and backoff are all configurable. Progress is persisted and survives operator restarts.
import subprocess
import time
from kopf.testing import KopfRunner
def test_operator_handles_creation():
with KopfRunner(['run', '-A', '--verbose', 'handlers.py']) as runner:
subprocess.run("kubectl apply -f obj.yaml",
shell=True, check=True)
time.sleep(1)
assert runner.exit_code == 0
assert runner.exception is None
assert 'Created' in runner.output
The built-in testing toolkit runs your operator in-process for integration tests. Combine with kubectl commands to verify end-to-end behavior.
Kopf watches Kubernetes resources and translates raw watch events into high-level causes for your handlers.
Kopf opens watch streams to the Kubernetes API for the resource types you register handlers for.
Raw events are analyzed to determine what actually changed: creation, update with diffs, deletion, or resumption after restart.
Matching handlers are invoked with rich kwargs: spec, diff, old/new, logger, and more. Results are persisted.
Custom resources, built-in resources, multiple types in one operator. Both cluster-scoped and namespaced.
import kopf
# Custom resources
@kopf.on.create('kopfexamples')
def on_example_create(spec, **_):
return {'ready': True}
# Built-in resources
@kopf.on.create('', 'v1', 'pods')
def on_pod_create(name, namespace, **_):
print(f"New pod: {namespace}/{name}")
# Namespaced filtering
@kopf.on.update('apps', 'v1', 'deployments',
labels={'managed-by': 'kopf'})
def on_deploy_update(diff, **_):
for op, field, old, new in diff:
print(f"{field}: {old} -> {new}")
From install to a running operator in three steps.
pip install kopf
import kopf
@kopf.on.create('kopfexamples')
def create_fn(spec, name, **_):
print(f"Created {name} with spec: {spec}")
kopf run handlers.py --verbose
For production, package it in a Docker image:
FROM python:3.14
ADD . /src
RUN pip install kopf
CMD kopf run /src/handlers.py --verbose
Kopf handles the infrastructure so you can focus on what matters: your domain logic.