One of my favorite new features of JavaScript and TypeScript is explicit resource management. It brings new syntax, using foobar = ..., that enables RAII, reducing boilerplate when managing the lifecycle of various resources.
In this article, I will explore this feature as implemented in TypeScript 5.2.0 with the disposablestack polyfill. I will mention both sync and async resources, DisposableStack/AsyncDisposableStack, and a non-obvious mistake I've made when using the new feature. Also, along the way, I will use some newer features of Node.js, that some people might not know about yet.
One of the simpler kinds of resource that a JavaScript or TypeScript programmer might encounter is an event subscription. Its lifecycle begins when subscribing to an event, and ends when unsubscribing from it. And in a lot of cases, forgetting to properly unsubscribe from an event will lead to memory leaks - an event handler is often a closure that retains a reference to the event emitter object, creating a reference cycle:
letlistener=newSomeListener();letemitter=newHeavyObject();emitter.on("event",()=>listener.onEvent(emitter));/* ... */emitter=null;// emitter won't be garbage collected// as long as listener is alive
Using event subscriptions as an example, let's is what the new resource management syntax looks like. First, to implement the lifecycle logic:
The Disposable protocol requires objects to have a [Symbol.dispose]() method - this method will be called to free the resource.
To demonstrate this resource's usage, I will write a unit test for subscribe() using one of the newer Node.js features - a built-in test runner:
// src/event-subscription.test.tsimport{subscribe}from"./event-subscription.js";importassertfrom"node:assert/strict";import{EventEmitter}from"node:events";import{describe,it}from"node:test";describe("event-subscription",()=>{it("is disposed at scope exit",()=>{constexpectedEvents=[1,2,3];constactualEvents:number[]=[];constobj=newEventEmitter();constfn=(e:number)=>actualEvents.push(e);{// initializing the resource with a `using` declarationusingguard=subscribe(obj,"event",fn);// the resource is alive till the end of the variable scopefor(consteofexpectedEvents)obj.emit("event",e);// end of scope for `guard`// guard[Symbol.dispose]() will be called here}obj.emit("event",123);assert.deepEqual(actualEvents,expectedEvents);assert.equal(obj.listenerCount("event"),0);});});
When talking about resource lifecycle in Node.js, most people really mean the ones I'll call async resources. They include open files, sockets, database connections - in short, any resources that fit the following usage model:
letresource:Resource;try{// the resource is initialized with an async methodresource=awaitResource.open();// doing stuff with resource}finally{// the resource is freed with an async methodawaitresource?.close();}
From the first glance, it's not really clear why the new syntax was introduced. I mean, we already have finally, right? But as soon as we have to deal with several resource at once, the boilerplate starts to pile up:
Adding to that, the try and finally blocks are different scopes, so we always need to declare mutable variables, instead of using const.
The new using syntax makes this much more manageable:
// src/file.test.tsimport{openFile}from"./file.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";describe("file",()=>{it("is disposed at scope exit",async()=>{{awaitusingfile=awaitopenFile("dist/test.txt","w");awaitfile.writeFile("test","utf-8");}{awaitusingfile=awaitopenFile("dist/test.txt","r");assert.equal(awaitfile.readFile("utf-8"),"test");}});});
Notice the await using file = await .... There are two awaits here. The first await means async disposal - that is, executing await file[Symbol.asyncDispose]() at the end of scope. The second await means async initialization - it is, in fact, just a regular await openFile() expression.
I'll implement openFile as a thin wrapper over the existing fs.FileHandle of Node.js.
// src/file.tsimport"disposablestack/auto";import*asfsfrom"node:fs/promises";import{Writable}from"node:stream";// the type of our resource is a union of AsyncDisposable and the fs.FileHandleexportinterfaceDisposableFileextendsfs.FileHandle,AsyncDisposable{// this helper method will become useful laterwritableWebStream(options?:fs.CreateWriteStreamOptions):WritableStream;}exportasyncfunctionopenFile(path:string,flags?:string|number):Promise<DisposableFile>{constfile=awaitfs.open(path,flags);// using Object.assign() to monkey-patch the disposal function into the objectreturnObject.assign(file,{[Symbol.asyncDispose]:()=>file.close(),writableWebStream:(options:fs.CreateWriteStreamOptions={autoClose:false})=>Writable.toWeb(file.createWriteStream(options)),});}
Let's run the tests:
$ npm test|grep file# Subtest: fileok 2 - file
The "async-sync": mutexes
From the first glance, the await using foo = await ... syntax can seem needlessly repetitive. But the thing is, there are resources that only require the initialization to be async, as well as those that only require async disposal.
As a demonstration of an "async init - sync dispose" resource, here is a RAII mutex:
// src/mutex.test.tsimport{Mutex}from"./mutex.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";import{setTimeoutassleep}from"node:timers/promises";describe("mutex-guard",()=>{it("is disposed at scope exit",async()=>{constmutex=newMutex();letvalue:number=0;consttask=async()=>{for(leti=0;i<5;i++){// async init - might have to wait until mutex becomes free// sync dispose - just notifying other awaitersusingguard=awaitmutex.acquire();// the scope of `guard` becomes a critical sectionconstnewValue=value+1;awaitsleep(100);value=newValue;// comment out the `using guard` line to see a race condition}};awaitPromise.all([task(),task()]);assert.equal(value,10);});});
I impmented Mutex as an async factory of Disposable objects:
As an example of a "sync init - async dispose" object, here is a simple task queue:
// src/task-queue.test.tsimport{TaskQueue}from"./task-queue.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";import{setTimeoutassleep}from"node:timers/promises";describe("task-queue",()=>{it("is disposed at scope exit",async()=>{letrunningTaskCount=0;letmaxRunningTaskCount=0;consttask=async()=>{runningTaskCount+=1;maxRunningTaskCount=Math.max(maxRunningTaskCount,runningTaskCount);awaitsleep(100);runningTaskCount-=1;};{awaitusingqueue=newTaskQueue({concurrency:2});queue.push(task);queue.push(task);queue.push(task);queue.push(task);// at the end of scope, it awaits all remaining tasks in the queue}assert.equal(runningTaskCount,0);assert.equal(maxRunningTaskCount,2);});});
The implementation is mostly straightforward:
Task queue implementation
// src/task-queue.tsimport"disposablestack/auto";import{EventEmitter,once}from"node:events";exporttypeTask=()=>Promise<void>;exportclassTaskQueueextendsEventEmitter{// сейчас это еще не совсем очевидно, но это поле - очень важная детальreadonlyresources=newAsyncDisposableStack();#concurrency:number;#tasks:Task[]=[];#runningTaskCount:number=0;constructor(options:{concurrency:number}){super();this.#concurrency=options.concurrency;this.on("taskFinished",()=>this.#runNextTask());}push(task:Task):void{this.#tasks.push(task);this.#runNextTask();}
#runNextTask():void{if(this.#runningTaskCount>=this.#concurrency)return;constnextTask=this.#tasks.shift()!;if(!nextTask)return;this.#runningTaskCount+=1;nextTask().catch((error)=>{this.emit("error",error);}).finally(()=>{this.#runningTaskCount-=1;this.emit("taskFinished");});}async[Symbol.asyncDispose]():Promise<void>{while(this.#tasks.length>0||this.#runningTaskCount>0){awaitonce(this,"taskFinished").catch(()=>{});}awaitthis.resources.disposeAsync();}}
As a simple exercise, let's write a function that uses all four of the resources defined earlier:
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";/**
* Fetch all `urls` with HTTP GET requests, concatenate all the responses in any order,
* and write them to `outPath`.
*
* @paramoptions.concurrency max number of concurrent requests
* @paramoptions.onError is called on request error
*/exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;// a task queue to limit the concurrencyawaitusingtaskQueue=newTaskQueue({concurrency});// an event subscription treated as a resourceusingerrorSubscription=subscribe(taskQueue,"error",onError);// synchronize file writes with a mutexconstoutFileMutex=newMutex();// ensure the file is closed at the end of scopeawaitusingoutFile=awaitopenFile(outPath,"w");for(consturlofurls){taskQueue.push(async()=>{// a brower-compatible global fetch() is also one]// of the newer Node.js featuresconstresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();// as are the browser-compatible data streamsawaitresponse.body?.pipeTo(outFile.writableWebStream());}});}}
Wrapping this up into a script with another Node.js feature - a built-in CLI args parser:
Huh... The script won't finish, and the output is empty. Looks like a bug.
The non-obvious mistake
To find my mistake, let's inspect the code a bit closer:
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;// notice the resource init orderawaitusingtaskQueue=newTaskQueue({concurrency});usingerrorSubscription=subscribe(taskQueue,"error",onError);awaitusingoutFile=awaitopenFile(outPath,"w");constoutFileMutex=newMutex();for(consturlofurls){taskQueue.push(async()=>{constresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();awaitresponse.body?.pipeTo(outFile.writableWebStream());}});}// This is the end of scope for both `outFile` and `taskQueue`.// They are disposed of in reverse declaration order.// That means that `outFile` will be closed before `taskQueue` is finished!}
There is a logic error here: the outFile lifetime should be bound not by the current scope, but by the lifetime of all the remaining queue tasks. The file should be closed only when all the tasks are done.
Sadly, Node.js isn't smart enough to automatically prolong the lifetimes of values captured by a closure. That means I'll have to bind them manually, using AsyncDisposableStack - a container that aggregates several AsyncDisposables together, freeing them all at once.
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;awaitusingtaskQueue=newTaskQueue({concurrency});// The `taskQueue.resources` field is an AsyncDisposableStack.// As part of TaskQueue's contract, it is disposed only after// all the tasks are doneconsterrorSubscription=subscribe(taskQueue,"error",onError);taskQueue.resources.use(errorSubscription);// связываем время жизниconstoutFile=awaitopenFile(outPath,"w");taskQueue.resources.use(outFile);// связываем время жизниconstoutFileMutex=newMutex();for(consturlofurls){taskQueue.push(async()=>{constresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();awaitresponse.body?.pipeTo(outFile.writableWebStream());}});}// Only the `taskQueue` resource is bound directly to this scope.// When it is disposed of, it first awaits all remaining queue tasks,// and only then disposes of all the `taskQueue.resources`.// Only then will the `outFile` be closed.}
Excellent! All the urls (excluding fakes) were fetched and written to ./cat.html, as intended.
As a general rule, all Disposable resources that hold sub-resources should hold them in a DisposableStask, disposing it inside their own dispose(). Same goes for AsyncDisposable and AsyncDisposableStask, of course.
article[Symbol.dispose]()
The dedicated RAII syntax isn't a novel idea for a programming language - C# has it, so does Python, and now JavaScript and TypeScript. This implementation, of course, isn't perfect, and has its own share of non-obvious behaviors. But still, I am glad that we finally have such a syntax - and, I hope, I managed to explain why!