Airlock
Express side effects anywhere. Control whether & when they escape.
Over the holidays I built Airlock (repo: https://github.com/ejucovy/airlock) a Python library to centralize control over async tasks and other side effects. It helps you decouple intent to run a side effect from actually running it.
import airlock
class Order:
def process(self):
self.status = "processed"
airlock.enqueue(notify_warehouse, self.id)
airlock.enqueue(send_confirmation_email)
with airlock.scope():
Order(1).process()
do_other_stuff()
# `notify_warehouse` and `send_confirmation_email` only fire here,
# and only if we reach the end of the scope without an exceptionOr if you’re in Django, a middleware will handle the scope implicitly:
## settings.py
MIDDLEWARE = [
# ...
"airlock.integrations.django.AirlockMiddleware",
]
## views.py
def checkout(request):
order = Order.objects.get(id=1)
order.process()
return HttpResponse("OK")
# All side effects dispatch here after response + transaction commitWhat does it do?
When you airlock.enqueue(some_function), your function doesn’t run immediately.
Later on, the “top level code” that called your function gets to decide whether your enqueued function should be executed or discarded.
That top level calling code is in charge of both when the decision should happen and what should happen, by defining a scope and a policy.
Decoupling intent to run a side effect from actually running it — and treating intents as something more like data — creates some nice benefits:
Separation of concerns: side effects can be colocated with the things that trigger them, while execution contexts can determine whether and when they should be dispatched.
Predictable timing: buffered side effects all get released at a single point in time, which might be after a database transaction has committed, after a successful HTTP response has been generated, after a human has audited the side effects, etc.
Centralized policy control: a scope can decide to silently drop all side effects, selectively release only the ones that won’t notify an end user, blow up if a particular intent got enqueued at all, release them all but only between 9am-5pm…
Auditability: log whenever a side effect is enqueued, released, or dropped.
Why I built it
Airlock started from some needs and some curiosity. The needs:
In a large codebase with
celery_task.delay()calls all over the place, how can we make calls more visible, and timing more predictable?In a large codebase with API calls all over the place, how can we update local cache in fully-context-aware, full-outcome-aware batches?
When writing integration tests, how can we inspect which tasks were triggered, and how many?
When calling the same code from multiple places, how can we centralize policy control — things like “actually suppress all side effects during a management command” or “only run two of the side effects from an admin-triggered action”?
And the curiosity:
What is a
ContextVaranyway? Can it help here?Can I design and build a library with a nice API, proper layering of concerns, and 100% test coverage without typing a single line of code?
If I do, will I understand it?
I’m pretty happy with the results.
Full documentation is available here. (The documentation is the only part I typed myself.1)
The core model
Airlock defines three orthogonal concerns, which are all designed for extensibility.
An
airlock.Scopedefines when side effects can escape.An
airlock.Policydefines whether side effects can escape at the end of a scope.And an
airlock.Executordefines how escaping side effects actually get invoked2: direct function calls, celery or another async task framework, thread pools, RPC…
from airlock.integrations.executors.celery import celery_executor
with airlock.scope(policy=airlock.DropAll(), executor=celery_executor):
airlock.enqueue(my_task, some_param=123)Nesting policies
Policies can be arbitrarily nested any time you’re within an airlock.scope:
with airlock.scope(policy=airlock.AllowAll()) as s:
airlock.enqueue(task_a)
with airlock.policy(airlock.DropAll()):
airlock.enqueue(task_b)
airlock.enqueue(task_c)
# all three intents are here,
# waiting for policy at the same time
print(len(s.intents)) # -> 3
# when scope exits, any local policies apply first, so:
# * `task_a` executes
# * `task_b` is dropped
# * `task_c` executesNesting scopes
The scopes themselves can also be arbitrarily nested, so reusable libraries can opt to establish their own boundaries. Importantly, nested scopes don’t actually flush independently — they’re captured3 recursively by any parent scopes instead:
def third_party_code_with_side_effects(a):
with airlock.scope():
airlock.enqueue(task_c)
return a * 2
with airlock.scope() as outer:
airlock.enqueue(task_a)
with airlock.scope() as inner:
airlock.enqueue(task_b)
third_party_code_with_side_effects(5)
# `task_a`, `task_b`, and `task_c` all execute hereWait, what? Why not local control?
If nested scopes were to flush by default, we would have an inverse flywheel. Airlock scopes defined deep in call stacks would recreate the same “side effects might get released anywhere” problem that airlock tries to solve.
So … if third-party code starts using airlock, you would lose control all over again.
With an “outer scopes maintain control” paradigm, multi-step operations stay well defined even when callees use scopes, without callers needing to know:
def checkout_cart(cart_id):
with airlock.scope():
validate_inventory(cart_id) # May use scopes internally
charge_payment(cart_id) # May use scopes internally
send_confirmation(cart_id) # May use scopes internally
# All effects dispatch only if we reach the end successfullyHow does it work?
The core is all very simple — a few hundred lines of code.
The main implementation challenge is “how can we push stuff onto a buffer from anywhere in our call stack, without threading an argument through every function call we might ever make?”
The solution is a context variable. Airlock leans heavily on Python’s builtin contextvars.ContextVar to allow “the current scope that we’re running within” to be a global-ish object that’s safely isolated even in asyncio tasks and gevent/greenlet environments with concurrent work.
When we enter
with airlock.scope()the newly createdScopeobject gets pushed into_current_scope(aContextVar)When we exit a scope, that
ContextVargets reset to its prior value, so nested-scopes-as-stacks sort of just work for free4When we’re not in any explicitly defined scope, it’s None, and any
airlock.enqueuecalls will loudly throw an error.
This means that airlock scopes are safe and should just do what you mean, both sequentially (one request/task/script after another in the same process) and concurrently (a single process switching between two or more requests/tasks/scripts at the same time).
At one point I got nervous about whether our buffers are really safe when running under greenlets, so Claude wrote me an extra 350 lines of test code to manually spawn fifty concurrent greenlets, each with its own isolated airlock scope, just to make me feel better.
Airlock’s guarantees
Airlock enforces some invariants to make life easier to reason about.
Policies cannot enqueue side effects; they just observe and filter. Intents come from your domain code, not from policy logic.5
Buffered effects escape only at flush; there’s no way for them to escape before the scope exits. The scope boundary is exclusively responsible for defining when effects can actually run.
Airlock fails closed; there’s no such thing as an implicit or default scope. If a developer tries to trigger a side effect outside of a defined scope, it raises a
NoScopeError. This prevents tasks from firing in environments like a quick shell script or a poorly configured worker where you haven’t explicitly chosen a boundary and a policy.
Of course, nothing will stop you from running side-effect-ish code directly, even alongside airlock, if you want some stuff to just happen without controlled buffers. Not every async task is a side effect!
import airlock
class Order:
def process(self):
self.status = "processed"
notify_warehouse.delay(self.id) # Just run it the old fashioned way
airlock.enqueue(send_confirmation_email)
with airlock.scope():
Order(1).process()
do_other_stuff()
# `notify_warehouse` dispatched right away, because airlock never saw it.
# But `send_confirmation_email` only fires here.But if you do want to enforce it codebase-wide, there’s also a flake8 plugin that catches direct .delay() calls bypassing airlock.
What else can it do?
I was thinking mainly of Django and async task queues when building airlock, but I think the pattern has some interesting applications in other contexts.
Game loops and UI redraws: create an
airlock.Scopefor each run of your main loop, flush the buffer on every tick.Drop-in outbox pattern: implement a custom
airlock.Policythat persists intents to a databaseon_enqueue, and anairlock.Executorthat just updates a flag to signal “it’s okay for the worker process to pick this up now.”AI tool use with human-in-the-loop: when an LLM calls a tool,
airlock.enqueuethe corresponding function as a side effect. Instead of executing it, render the “pending” intents to a UI and return a stub response to the AI so it can keep working/thinking/talking. Later, the human clicks “Approve” on two buffered intents, and theairlock.Policyfilters the rest.
The code was produced in 97 commits between Christmas and New Year’s Eve through:
a lengthy ChatGPT conversation about feasibility and approach
a Claude Code for Web session (“implement the library designed in
CONVERSATION.md”)several rounds of code review by ChatGPT and Gemini in Antigravity
…that were pasted back into Claude Code for Web
(Claude was receptive while maintaining self-respect: “Good critique.” “Analysis is solid.” “Where I’d Push Back or Add Nuance…”)
and then another several dozen Claude Code for Web sessions to refine the design and API, catch edge cases, do the packaging, and write the tests
I think the Executor concept is not fully baked, because it’s currently coupled fairly inherently with “what types of things get passed in to airlock.enqueue” (plain callables, decorated @task functions, etc) — which in turn is the responsibility of the “deep code” that’s definitionally outside the control of airlock and its user. But for a v0.1a I think it’s good enough.
This is customizable too — a custom airlock.Scope class can allow its descendant scopes to independently release all, some, or none of their buffered intents.
We also maintain another ContextVar to manually maintain the stack so that inner scopes can directly signal to their ancestors that they’re planning to flush.
This is enforced via yet another ContextVar that just keeps track of whether we’re currently in policy code, and throws if we try to enqueue an intent from in there.


