Одной из самых интересных грядущих новинок JavaScript и TypeScript для меня является явное управление ресурсами. Новый синтаксис using foobar = ... реализует идиому RAII, позволяя писать намного менее многословный код, управляющий какими-либо ресурсами.
В этой статье я хочу на примерах разобрать эту фичу — в том виде, в котором она сейчас доступна в TypeScript 5.2.0-beta с полифиллом disposablestack. Я рассмотрю синхронные и асинхронные ресурсы, DisposableStack/AsyncDisposableStack, а также приведу пример неочевидного бага, в который попался я сам. По пути я также коснусь нескольких других нововведений Node.js, про которые, возможно, еще знают не все.
Также понадобится настроить IDE так, чтобы она тоже поддерживала новый синтаксис. Я пользуюсь Visual Studio Code; для нее нужно прописать в настройках проекта путь к локальному компилятору, а также переключиться на стандартный форматтер кода — prettier еще не переваривает новый синтаксис:
Наконец, понадобится настроить сам компилятор. Для поддержки нового синтаксиса нужны опции "lib": "esnext" или "lib": "esnext.disposable". Я также включаю поддержку ES-модулей.
Самый простой пример ресурса, за которым в JavaScript и TypeScript нужно следить вручную — это подписки на события. Конкретнее, от них во многих случаях нужно не забывать отписываться. В замыкании-обработчике события зачастую есть ссылка на объект-источник, а у источника есть ссылка на обработчик, что порождает цикл из ссылок на объекты в куче. Это может порождать неявные "висящие" ссылки, которые не дадут GC собрать эту память:
letlistener=newSomeListener();letemitter=newHeavyObject();emitter.on("event",()=>listener.onEvent(emitter));/* ... */emitter=null;// emitter не соберется до тех пор, пока жив listener
Давайте на примере подписок посмотрим, как выглядит синтаксис управления ресурсами. Вот создание объекта-ресурса:
Такие объекты должны удовлетворять интерфейсу Disposable — иметь метод [Symbol.dispose], который и будет осуществлять освобождение ресурсов.
В качестве примера использования, напишем юнит-тест для функции subscribe(), используя еще одну из недавних фич Node.js — встроенную поддержку запуска тестов:
// src/event-subscription.test.tsimport{subscribe}from"./event-subscription.js";importassertfrom"node:assert/strict";import{EventEmitter}from"node:events";import{describe,it}from"node:test";describe("event-subscription",()=>{it("is disposed at scope exit",()=>{constexpectedEvents=[1,2,3];constactualEvents:number[]=[];constobj=newEventEmitter();constfn=(e:number)=>actualEvents.push(e);{// инициализируем ресурс с помощью ключевого слова usingusingguard=subscribe(obj,"event",fn);// ресурс живет до тех пор, пока мы не выйдем из области// видимости переменной guardfor(consteofexpectedEvents)obj.emit("event",e);// конец области видимости// здесь выполняется guard[Symbol.dispose]()}obj.emit("event",123);assert.deepEqual(actualEvents,expectedEvents);assert.equal(obj.listenerCount("event"),0);});});
Когда говорят про ручное управление ресурсами в контексте Node.js, чаще всего имеют в виду то, что я назову асинхронными ресурсами. Это открытые файлы, сокеты, подключения к базе данных — другими словами, те, что укладываются в такую модель использования:
Казалось бы, никакой специальный синтаксис для этого и не нужен: у нас есть finally, чего еще хотеть? Однако многословность такого подхода становится видна, если ресурсов несколько:
К тому же, неудобства доставляет то, что области видимости внутри блоков try и finally разные. Плюс, есть и место для неочевидных багов: всегда ли вы помнили о том, что в finally нужен знак ??
Новый синтаксис using делает использование ресурсов более удобным:
// src/file.test.tsimport{openFile}from"./file.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";describe("file",()=>{it("is disposed at scope exit",async()=>{{awaitusingfile=awaitopenFile("dist/test.txt","w");awaitfile.writeFile("test","utf-8");}{awaitusingfile=awaitopenFile("dist/test.txt","r");assert.equal(awaitfile.readFile("utf-8"),"test");}});});
Обратите внимание на запись await using file = await .... Первый await здесь указывает на асинхронное освобождение ресурсов: при выходе области видимости будет выполнен await file[Symbol.asyncDispose](). Второй — на асинхронную инициализацию: это просто вызов асинхронной openFile().
Давайте посмотрим, как можно реализовать такую обертку для уже существующего ресурса. В нашем примере это будет fs.FileHandle.
// src/file.tsimport"disposablestack/auto";import*asfsfrom"node:fs/promises";import{Writable}from"node:stream";// тип нашего ресурса — объединение AsyncDisposable и исходного fs.FileHandleexportinterfaceDisposableFileextendsfs.FileHandle,AsyncDisposable{// добавим также вспомогательную функцию, которая понадобится нам позжеwritableWebStream(options?:fs.CreateWriteStreamOptions):WritableStream;}exportasyncfunctionopenFile(path:string,flags?:string|number):Promise<DisposableFile>{constfile=awaitfs.open(path,flags);// добавим функции прямо в объект file с помощью Object.assignreturnObject.assign(file,{[Symbol.asyncDispose]:()=>file.close(),writableWebStream:(options:fs.CreateWriteStreamOptions={autoClose:false})=>Writable.toWeb(file.createWriteStream(options)),});}
Запустим наши тесты:
$ npm test|grep file# Subtest: fileok 2 - file
"async-sync": мьютексы
Синтаксис await using foo = await ... может казаться не очень-то нужным повторением. Но на самом деле, несложно привести примеры ресурсов, у которых будут асинхронными только инициализация или только освобождение.
Как пример ресурса с асинхронной инициализацией, но синхронным освобождением приведу один из моих любимых применений паттерна RAII — мьютекс:
// src/mutex.test.tsimport{Mutex}from"./mutex.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";import{setTimeoutassleep}from"node:timers/promises";describe("mutex-guard",()=>{it("is disposed at scope exit",async()=>{constmutex=newMutex();letvalue:number=0;consttask=async()=>{for(leti=0;i<5;i++){// инициализация асинхронная - может понадобиться ожидание// освобождение синхронное - отправка сигнала другим ожидающимusingguard=awaitmutex.acquire();// до конца области видимости guard - критическая секцияconstnewValue=value+1;awaitsleep(100);value=newValue;// закомментируйте строчку using guard, чтобы увидеть// классический пример состояния гонки}};awaitPromise.all([task(),task()]);assert.equal(value,10);});});
Реализован наш Mutex как асинхронная фабрика Disposable-объектов:
Как пример объекта с синхронной инициализацией и асинхронным освобождением, рассмотрим очередь задач:
// src/task-queue.test.tsimport{TaskQueue}from"./task-queue.js";importassertfrom"node:assert/strict";import{describe,it}from"node:test";import{setTimeoutassleep}from"node:timers/promises";describe("task-queue",()=>{it("is disposed at scope exit",async()=>{letrunningTaskCount=0;letmaxRunningTaskCount=0;consttask=async()=>{runningTaskCount+=1;maxRunningTaskCount=Math.max(maxRunningTaskCount,runningTaskCount);awaitsleep(100);runningTaskCount-=1;};{awaitusingqueue=newTaskQueue({concurrency:2});queue.push(task);queue.push(task);queue.push(task);queue.push(task);// в конце области видимости ожидаем завершения всех задач в очереди}assert.equal(runningTaskCount,0);assert.equal(maxRunningTaskCount,2);});});
Ее реализация не слишком интересная, за исключением одной детали, о которой поговорим позже:
Реализация очереди
// src/task-queue.tsimport"disposablestack/auto";import{EventEmitter,once}from"node:events";exporttypeTask=()=>Promise<void>;exportclassTaskQueueextendsEventEmitter{// сейчас это еще не совсем очевидно, но это поле - очень важная детальreadonlyresources=newAsyncDisposableStack();#concurrency:number;#tasks:Task[]=[];#runningTaskCount:number=0;constructor(options:{concurrency:number}){super();this.#concurrency=options.concurrency;this.on("taskFinished",()=>this.#runNextTask());}push(task:Task):void{this.#tasks.push(task);this.#runNextTask();}
#runNextTask():void{if(this.#runningTaskCount>=this.#concurrency)return;constnextTask=this.#tasks.shift()!;if(!nextTask)return;this.#runningTaskCount+=1;nextTask().catch((error)=>{this.emit("error",error);}).finally(()=>{this.#runningTaskCount-=1;this.emit("taskFinished");});}async[Symbol.asyncDispose]():Promise<void>{while(this.#tasks.length>0||this.#runningTaskCount>0){awaitonce(this,"taskFinished").catch(()=>{});}awaitthis.resources.disposeAsync();}}
Для практики, напишем функцию fetchCat(), которая будет использовать все четыре определенных нами ресурса:
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";/**
* Забрать GET-запросами данные со всех `urls` и склеить по порядку в файл `outPath`.
* Порядок страниц в выходном файле не гарантируется.
*
* @paramoptions.concurrency максимальное количество одновременных запросов
* @paramoptions.onError вызывается в случае ошибки при получении одного из urls
*/exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;// для ограничения concurrency воспользуемся очередью задачawaitusingtaskQueue=newTaskQueue({concurrency});// подписку на событие тоже используем как ресурсusingerrorSubscription=subscribe(taskQueue,"error",onError);// синхронизируем запись в выходной файл мьютексомconstoutFileMutex=newMutex();// файл будет закрыт в конце области видимостиawaitusingoutFile=awaitopenFile(outPath,"w");for(consturlofurls){taskQueue.push(async()=>{// глобальный fetch() - еще одно недавнее нововведение Node.js// по интерфейсу он совместим с браузернымconstresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();// а еще можно использовать те же интерфейсы стримов, что и в браузереawaitresponse.body?.pipeTo(outFile.writableWebStream());}});}}
Опишем точку входа, распарсив агрументы встроенным в Node.js парсером — еще одна недавняя фича!
Хм, странно. Скрипт не завершается, а выходной файл пустой. Похоже на баг.
Неочевидный баг
Чтобы найти, в чем ошибка, рассмотрим код подробнее:
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;// обратите внимание на порядок инициализации ресурсовawaitusingtaskQueue=newTaskQueue({concurrency});usingerrorSubscription=subscribe(taskQueue,"error",onError);awaitusingoutFile=awaitopenFile(outPath,"w");constoutFileMutex=newMutex();for(consturlofurls){taskQueue.push(async()=>{constresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();awaitresponse.body?.pipeTo(outFile.writableWebStream());}});}// Здесь кончается область видимости у outFile и у taskQueue.// Освобождение ресурсов происходит в обратном порядке.// Получается, что outFile будет закрыт раньше, чем taskQueue закончится!}
На самом деле, логическая ошибка не исправится, если просто переставить местами ресурсы. Она заключается в том, что время жизни outFile должно быть привязано не к текущей области видимости, а ко времени жизни задач в очереди. Файл должен быть закрыт не раньше, чем все задачи в очереди завершатся.
К сожалению, Node.js не позволяет замыканиям продлевать время жизни захваченных ими ресурсов. Придется связать их явно. Но все-таки не совсем вручную — для аггрерации ресурсов используем класс AsyncDisposableStack — еще одну часть пропозала:
// src/fetch-cat.tsimport{subscribe}from"./event-subscription.js";import{openFile}from"./file.js";import{Mutex}from"./mutex.js";import{TaskQueue}from"./task-queue.js";exportasyncfunctionfetchCat(options:{urls:string[],outPath:string,concurrency:number,onError:(error:any)=>void,},):Promise<void>{const{urls,outPath,concurrency,onError}=options;awaitusingtaskQueue=newTaskQueue({concurrency});// Поле taskQueue.resources имеет тип AsyncDisposableStack.// Как часть контракта TaskQueue, оно освобождается в его dispose,// причем только после завершения всех задач.consterrorSubscription=subscribe(taskQueue,"error",onError);taskQueue.resources.use(errorSubscription);// связываем время жизниconstoutFile=awaitopenFile(outPath,"w");taskQueue.resources.use(outFile);// связываем время жизниconstoutFileMutex=newMutex();for(consturlofurls){taskQueue.push(async()=>{constresponse=awaitfetch(url);{usingoutFileGuard=awaitoutFileMutex.acquire();awaitresponse.body?.pipeTo(outFile.writableWebStream());}});}// К этой области видимости из ресурсов привязан только сам taskQueue.// При его освобождении сначала будут выполнены все задачи в очереди,// а потом освобожден весь стек taskQueue.resources.// Таким образом, файл будет корректно закрыт}
Отлично! Все (настоящие) страницы были загружены, а посмотрев в ./cat.html, можем убедиться, что загружены правильно и без гонок.
Классы DisposableStack и AsyncDisposableStack предназначены для аггрегации нескольких ресурсов в один. Как правило, любой Disposable-ресурс, если у него есть под-ресурсы, должен иметь свой DisposableStack, и освобождать его у себя в dispose(). С AsyncDisposable и AsyncDisposableStack — аналогично.
article[Symbol.dispose]()
Идея специального синтаксиса для паттерна RAII не нова — он есть как минимум в C# и в Python. Сегодня мы рассмотрели его реализацию из будущих версий JavaScript и TypeScript. У нее есть свои ограничения и неочевидные моменты. Но, несмотря на них, я очень рад появлению такого синтаксиса — и, надеюсь, смог объяснить, почему.