PLAY DEVELOPERS BLOG

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

動画配信技術を実装して理解する〜HTTP-FLV 配信編〜

こんにちは、技術推進室の石川です。「動画配信技術を実装して理解する」も2回目の記事となりました。不定期連載ですので、気ままに連載していこうと思います。 前回(第1回目)の記事はこちらになります。developers.play.jp

前回はRTMPを受信してFLVファイルに変換しましたが、今回はFLV出力をファイルではなくHTTPで配信する、いわゆるHTTP-FLV配信を実装してみたいと思います。今現在、FLVは使われてないように見えますが、実はFLVをHTTPのストリームとして配信しているという事例も存在します。今回は、その配信方式がどのような実装で成り立っているかを説明します。

では、HTTPでFLVを配信する部分を実際に実装して理解してみましょう。

HTTP-FLVとは?

HTTP-FLVとは、HTTPのストリーミングレスポンスを用いてFLVを送る配信方式です。RTMPのような双方向通信を使わない、片方向の一方的な配信で使われています。ブラウザではfetchでHTTPのストリーミングレスポンスを受けられるので、MSE(MMS)が使える環境ではブラウザでの視聴も可能です。また、この延長線上で双方向の通信をしたい場合にはWebSocket-FLVが使われるようです。ブラウザ対応まで考慮するとRTMPのTCPよりHTTPなどのブラウザネイティブなスタックに載っていた方が便利だからでしょうか?

欠点としては、対応しているCDNが少ないという点が挙げられます。ほとんどのCDNはファイルを配信するという目的で実装されているため、無限長のストリームを処理できるCDNは稀です。そのため、あまり普及してませんが、時々このような配信形式を見かけることがあります。

HTTP-FLV実装の流れ

非同期読み込み

前回も登場した非同期読み込みクラスですが、前回はダンプを前提としてEOFの場合の処理を省いて掲載していました。今回はHTTP-FLV配信ということで、ある程度使い物になるようにエラー処理を書き足します。

実装としては、いままでresolveだけしか処理していなかったところでrejectを処理するようにして、明示的にEOFが入れられてデータが足りない場合にはrejectを呼ぶようにしています。

非同期読み取りクラス (クリックで展開)

async-byte-reader.ts

export default class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private eof = false;
  private promises: [number, (result: Buffer) => void, (error: Error) => void][] = [];

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }

    if (!this.eof) { return; }
    while (this.promises.length > 0) {
      const [,,reject] = this.promises[0];
      reject(new Error('EOF Exception'));
      this.promises.shift();
    }
  }

  public feed(buffer: Buffer): void {
    if (this.eof) { return; }
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public feedEOF(): void {
    this.eof = true;
    this.fulfill();
  }
  public [Symbol.dispose](): void { this.feedEOF(); }

  public read(size: number): Promise<Buffer> {
    const { promise, resolve, reject } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve, reject]);
    this.fulfill();
    return promise;
  }

  public async readUIntBE(length: number): Promise<number> {
    return (await this.read(length)).readUIntBE(0, length);
  }

  public async readUIntLE(length: number): Promise<number> {
    return (await this.read(length)).readUIntLE(0, length);
  }

  public async readU8(): Promise<number> {
    return this.readUIntBE(1);
  }

  public async readU16BE(): Promise<number> {
    return this.readUIntBE(2);
  }

  public async readU24BE(): Promise<number> {
    return this.readUIntBE(3);
  }

  public async readU32BE(): Promise<number> {
    return this.readUIntBE(4);
  }

  public async readU16LE(): Promise<number> {
    return this.readUIntLE(2);
  }

  public async readU24LE(): Promise<number> {
    return this.readUIntLE(3);
  }

  public async readU32LE(): Promise<number> {
    return this.readUIntLE(4);
  }
}

RTMP受信部分

せっかくなので、前回のRTMP受信部分は関数に切り出しましょう。こういう処理は内部でシステムが処理すべきメッセージを処理して、映像・音声・データのメッセージを返すジェネレータとして実装するとうまく抽象化できていい感じです。そうすれば、このジェネレータを読んだ側はfor await文でRTMPの映像・音声・データのメッセージを処理すればよく、関心が分離されます。

いま流行りのコンテナ化をする前提で以下の仮定のもと単純化して組んでしまいましょう。

  • 1つの固定のアプリケーションとストリームキーを受け付ける
  • 1接続だけを受け付けるように排他制御する
  • 同時に配信されそうになった時は先勝ち

そのように実装したのが以下のコードとなります。

rtmp-accepter.ts

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
} as const;

let lock = false;
export default async function* handle_rtmp(connection: Duplex, app: string, key: string, limit?: number): AsyncIterable<Message> {
  let locking = false;
  const bandwidthController = new AbortController();
  const reader = new AsyncByteReader();
  const estimator = new BandwidthEstimator(limit ?? Number.POSITIVE_INFINITY, bandwidthController);
  connection.pipe(new Writable({
    write(data, _, cb) { reader.feed(data); estimator.feed(data.byteLength); cb(); },
    destroy(err, cb) { reader.feedEOF(); cb(err); }
  }));

  try {
    /*
    * RTMPのハンドシェイクを処理する
    */
    {
      // C0/S0
      await reader.readU8(); // Read C0
      connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
      // C1/S1
      const c1 = await reader.read(1536); // read C1
      const c1_random = c1.subarray(8);
      const s1_random = randomBytes(1536 - 8);
      const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
      connection.write(s1); // write C1
      // C2/S2
      const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
      connection.write(s2); // write S2
      const c2 = await reader.read(1536); // read C2
      const c2_random_echo = c2.subarray(8);
      // Check Random Echo
      if (s1_random.byteLength !== c2_random_echo.byteLength || !s1_random.equals(c2_random_echo)) {
        throw new Error('Invalid Random Echo');
      }
    }
    /*
    * RTMPのメッセージを処理する
    */
    let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
    for await (const message of read_message(reader)) {
      if (bandwidthController.signal.aborted) { throw new Error('Bandwidth quota exceeded!', { cause: bandwidthController.signal.reason }); }

      // 本当は共通で処理するシステムメッセージ部分を書く、大体の打ち上げRTMPではなくても良いので省略

      switch (state) {
        case STATE.WAITING_CONNECT: {
          if (message.message_stream_id !== 0) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'connect') { continue; }
          const transaction_id = command[1];
          const appName = command[2]['app'];
          const connectAccepted = appName === app;

          const info = connectAccepted ? {
            code: 'NetConnection.Connect.Success',
            description: 'Connection succeeded.',
            data: { version: '3,5,7,7009', },
            objectEncoding: 0, // 0 = AMF0, 3 = AMF3
            level: 'status', // 正常系
          } : {
            code: 'NetConnection.Connect.Rejected',
            description: 'Connection rejected.',
            level: 'error', // 異常系
          };

          const result = write_amf0(
            '_result',
            transaction_id,
            {
              fmsVer: 'FMS/3,5,7,7009',
              capabilities: 31,
              mode: 1,
            },
            info
          );
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: 0,
            timestamp: 0,
            data: result,
          }));

          if (!connectAccepted) { return; }
          state = STATE.WAITING_CREATESTREAM;
          break;
        }
        case STATE.WAITING_CREATESTREAM: {
          if (message.message_stream_id !== 0) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'createStream') { continue; }
          const transaction_id = command[1];

           // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
          const result = write_amf0('_result', transaction_id, null, 1);
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: 0,
            timestamp: 0,
            data: result,
          }));

          state = STATE.WAITING_PUBLISH;
          break;
        }
        case STATE.WAITING_PUBLISH: {
          if (message.message_stream_id !== 1) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'publish') { continue; }
          const transaction_id = command[1];
          const streamKey = command[3];
          const publishAccepted = streamKey === key && !lock; // streamKey が合致していて、配信されてない場合は配信を許可する

          const info = publishAccepted ? {
            code: 'NetStream.Publish.Start',
            description: 'Publish Accepted',
            level: 'status' // 正常系
          } : {
            code: 'NetStream.Publish.Failed', // Permision Denied
            description: 'Publish Failed',
            level: 'error' // 異常系
          };

          const result = write_amf0('onStatus', transaction_id, null, info);
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: message.message_stream_id,
            timestamp: 0,
            data: result,
          }));

          if (!publishAccepted) { return; }
          locking = lock = true;
          state = STATE.PUBLISHED;
          break;
        }
        case STATE.PUBLISHED: {
          yield message;
          break;
        }
      }
    }
  } catch (e) {
    throw e;
  } finally {
    if (locking) { lock = false; }
    connection.destroy();
    estimator.destroy();
    reader.feedEOF();
  }
}

帯域制限

配信サーバとして使いものになるように実装するに当たって、クライアントがこちらの想定しているビットレートを超過しているか判断できると嬉しいです。配信コストが打ち上げ側負担であったり、トランスコードする場合ではビットレートの超過をあまり気にしなくても良いのですが、今回はそれ以外のケースも考えてビットレートの超過を判断したいと思います。

なので、ビットレート推定を行うコードも実装します。ビットレートはVBRの場合に増減があるので、移動平均を使ってやるとある程度滑らかな平均ビットレートを出すことができます。

実装としてはAbortControllerを受け取って、データのバイト数から平均ビットレートを算出し、移動平均が指定の値を超えたらabort()するという形です。まぁまぁ単純な実装である程度ワークします。

bandwidth-estimator.ts

export type BandwidthEstimatorOption = {
  intervalMillis: number;
  movingAverageLength: number;
}
export const BandwidthEstimatorOption = {
  from(option?: Partial<BandwidthEstimatorOption>): BandwidthEstimatorOption {
    return { intervalMillis: 100, movingAverageLength: 10, ... option };
  }
}

export default class BandwidthEstimator {
  private option: BandwidthEstimatorOption;
  private controller: AbortController;
  private limit: number;
  private timerId: NodeJS.Timeout | null = null;
  private readonly measureHandler = this.measure.bind(this);
  private estimates: number[] = [];
  private movingAverageLength: number;
  private totalBytes: number = 0;
  private previousTime: DOMHighResTimeStamp = performance.now();

  public constructor(limit: number, controller: AbortController, option?: Partial<BandwidthEstimatorOption>) {
    this.limit = limit;
    this.controller = controller;
    this.option = BandwidthEstimatorOption.from(option);
    this.movingAverageLength = this.option.movingAverageLength;
    this.timerId = setInterval(this.measureHandler, this.option.intervalMillis);
  }

  public feed(byteLength: number) {
    this.totalBytes += byteLength;
  }

  public estimate(): number {
    return this.estimates.reduce((sum, curr) => sum + curr, 0) / this.movingAverageLength;
  }

  public measure(): void {
    const currentTime = performance.now();
    const duration = (currentTime - this.previousTime) / 1000;
    const bandwidth = (this.totalBytes * 8) / duration;
    this.estimates.push(bandwidth);

    if (this.estimates.length >= this.movingAverageLength) {
      this.estimates.splice(0, this.estimates.length - this.movingAverageLength);
      const average = this.estimate();
      if (average >= this.limit) {
        this.controller.abort(new Error(`Bandwidth quota exceeded! limit: ${this.limit}, actual: ${average}`));
      }
    }

    this.previousTime = currentTime;
    this.totalBytes = 0;
  }

  public destroy(): void {
    if (this.timerId == null) { return; }
    clearInterval(this.timerId);
    this.timerId = null;
  }
}

HTTPサーバによるストリーミング配信

今回はHTTPの配信もNode.jsのhttpモジュールで行います。これはNode.jsの標準ライブラリだけ縛りという意味もありますが、Node.jsのStreamAPIで指定できるhighWaterMarkをHTTPサーバに指定可能なフレームワークがほぼないからです。StreamAPIのつまり具合は内部バッファの量がhighWaterMarkを超えてるかどうかで決まります。内部バッファの量がhighWaterMarkを超えると書き込みがfalseを返すため、内部バッファが詰まっているか判断できます。

今回はhighWaterMarkを超えた場合(書き込みがfalseを返した時)にはパケット詰まりで受信がうまくできてないと判断し、接続を切断するようにしています。このようにすることで、受信側が詰まっているがコネクションが切れてない、という状態で無制限に書き込み待ちデータをサーバ側に溜めないようにします。

また、HTTPでFLVを配信する上で注意する点として、RTMPの映像・音声の初期化用メッセージを視聴者の接続開始毎に送る必要があります。RTMPはTCPでありパケットロスを考慮しないため、MPEG-TSのように初期化用のデータを定期的に再送するようなことはしません。このため、初期化用メッセージを保存しておいて、接続毎にない場合は送ってあげる必要があります。

http-streaming.ts

const write_tag_header = (message: Message, previousTagSize: number): Buffer => {
  const header = Buffer.alloc(15);
  header.writeUIntBE(previousTagSize, 0, 4);
  header.writeUIntBE(message.message_type_id, 4, 1)
  header.writeUIntBE(message.data.byteLength, 5, 3);
  header.writeUInt8(Math.floor(message.timestamp / (2 ** 16)) % (2 ** 8),  8);
  header.writeUInt8(Math.floor(message.timestamp / (2 **  8)) % (2 ** 8),  9);
  header.writeUInt8(Math.floor(message.timestamp / (2 **  0)) % (2 ** 8), 10);
  header.writeUInt8(Math.floor(message.timestamp / (2 ** 24)) % (2 ** 8), 11);
  header.writeUIntBE(0, 12, 3);
  return header;
}

type StreamingInformation = [writeFn: (buffer: Buffer) => void, exitFn: () => void, priviousTagSize: number, initialized: boolean]
const streaming = new Map<number, StreamingInformation>();const handle = async (connection: Duplex) => {
  let onMetadataMessage: Message | null = null;
  let avcConfigMessage: Message | null = null;
  let aacConfigMessage: Message | null = null;

  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      const reader = new ByteReader(message.data);
      switch (message.message_type_id) {
        case MessageType.Video: {
          const codec = reader.readU8() & 0x0F;
          if (codec !== 0x07) { continue; } // Accept AVC
          const packetType = reader.readU8();
          if (packetType === 0) { avcConfigMessage = message; }
          break;
        }
        case MessageType.Audio: {
          const codec = reader.readU8() >> 4;
          if (codec !== 10) { continue; } // Accept AAC
          const packetType = reader.readU8();
          if (packetType === 0) { aacConfigMessage = message; }
          break;
        }
        case MessageType.DataAMF0: {
          const command = read_amf0(message.data);
          if (command.length !== 3) { continue; }
          if (command[0] !== '@setDataFrame') { continue; }
          if (command[1] !== 'onMetaData') { continue; }
          onMetadataMessage ={ ... message, data: write_amf0(command[1], command[2]) };
          break;
        }
        default: continue;
      }

      for (const stream of streaming.values()){
        const [write,,, initialize] = stream;
        if (initialize) {
          for (const message of [onMetadataMessage, avcConfigMessage, aacConfigMessage]) {
            if (message == null) { continue; }
            const previousTagSize = stream[2];
            const header = write_tag_header(message, previousTagSize)
            write(header);
            write(message.data);
            stream[2] = (header.byteLength + message.data.byteLength) - 4;
          }
        }
        {
          const previousTagSize = stream[2];
          const header = write_tag_header(message, previousTagSize)
          write(header);
          write(message.data);
          stream[2] = (header.byteLength + message.data.byteLength) - 4;
        }
        stream[3] = false;
      };
    }
  } catch (e) {
    console.error(e);
  } finally {
    for (const [,end] of streaming.values()) { end(); }
  }
}

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

let viewers = 0;
const web_server = http.createServer({ highWaterMark }, (req, res) => {
  if (req.url == null) {
    res.writeHead(404, { "access-control-allow-origin": '*' });
    res.end();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname === `/${app}/${streamKey}`)) {
    res.writeHead(404, { "access-control-allow-origin": '*' });
    res.end();
    return;
  }
  const viewer = viewers++;
  res.writeHead(200, {
    "content-type": 'application/octet-stream',
    "access-control-allow-origin": '*',
  });

  const write = (chunk: Buffer) => {
    if (res.closed) { return; }
    if (!res.write(chunk) && highWaterMark != null) { res.destroy(); }
  };
  write(Buffer.from([
    0x46, 0x4C, 0x56, // Signature (FLV)
    1,               // version
    4 | 1,           // 4: Audio Present, 1: Video Present
    0, 0, 0, 9       // Header Bytes
  ]));

  const exit = () => { res.end(); streaming.delete(viewer); }
  const entry = [write, exit, 0, true] satisfies StreamingInformation;
  streaming.set(viewer, entry);

  req.on('close', exit);
  res.on('close', exit);
  req.on('error', exit);
  res.on('error', exit);
});
web_server.listen({ port: web });

受信の仕方

HTTP-FLVはffplayなどのクライアントで受信ができます。MSEを使ったブラウザで受信するプレイヤーもあるようです。

今回のコードで受信する場合は、ffplayで以下のように指定します。${web}はWebサーバのポート、${app}${streamKey}は指定したアプリ名とストリームキーです。

HTTP-FLV受信コマンド(ffplay)

ffplay -f flv http://localhost:${web}/${app}/${streamKey}

このコマンドでRTMPで打ち上げた内容と同じ内容が出力されたら、HTTP-FLV配信成功です。

まとめ

今度はHTTP-FLVを実装して理解するというテーマでやってみましたが、いかがでしたでしょうか。HTTPのストリーミング配信でFLVを送るというのは、RTMPを片方向通信にしたものだと考えると、ある程度は理にかなっているなと感じてもらえるんじゃないかと思います。こちらもRTMP同様にコードをみて、単純に実装できるんだなと思ってもらえれば幸いです。

ちなみに、今回はコード行数を数えると800行弱でした。一応、Node.jsの標準ライブラリだけ使うようにしていますが、例示用のコードとしては行数は多くなく実装できているんじゃないかと思います。

おまけ

今回のブログ記事で紹介したコード片を1ファイルにまとめてみました。このTypeScriptのコードをファイルに貼り付ければ、Node24ならNode.jsだけで動きますので、是非試してHTTP-FLV配信をやってみてください。

1ファイルまとめ版 (クリックで展開)

bundled.mts

import net from "node:net";
import http from "node:http"
import { randomBytes } from 'node:crypto'
import { Duplex, Writable } from "node:stream";
import { parseArgs } from "node:util";

class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private eof = false;
  private promises: [number, (result: Buffer) => void, (error: Error) => void][] = [];

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }

    if (!this.eof) { return; }
    while (this.promises.length > 0) {
      const [,,reject] = this.promises[0];
      reject(new Error('EOF Exception'));
      this.promises.shift();
    }
  }

  public feed(buffer: Buffer): void {
    if (this.eof) { return; }
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public feedEOF(): void {
    this.eof = true;
    this.fulfill();
  }

  public read(size: number): Promise<Buffer> {
    const { promise, resolve, reject } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve, reject]);
    this.fulfill();
    return promise;
  }

  public async readUIntBE(length: number): Promise<number> {
    return (await this.read(length)).readUIntBE(0, length);
  }

  public async readUIntLE(length: number): Promise<number> {
    return (await this.read(length)).readUIntLE(0, length);
  }

  public readU8(): Promise<number> {
    return this.readUIntBE(1);
  }

  public readU16BE(): Promise<number> {
    return this.readUIntBE(2);
  }

  public readU24BE(): Promise<number> {
    return this.readUIntBE(3);
  }

  public readU32BE(): Promise<number> {
    return this.readUIntBE(4);
  }

  public readU16LE(): Promise<number> {
    return this.readUIntLE(2);
  }

  public readU24LE(): Promise<number> {
    return this.readUIntLE(3);
  }

  public readU32LE(): Promise<number> {
    return this.readUIntLE(4);
  }
}

class ByteReader {
  private buffer: Buffer;
  private offset = 0;

  public constructor(buffer: Buffer) {
    this.buffer = buffer;
  }

  public isEOF(): boolean {
    return this.offset >= this.buffer.byteLength;
  }

  public read(length: number): Buffer {
    if (length < 0) { length = this.buffer.byteLength - this.offset; }
    const value = this.buffer.subarray(this.offset, this.offset + length);
    this.offset += length;
    return value;
  }

  public readUIntBE(length: number): number {
    return this.read(length).readUIntBE(0, length);
  }

  public readIntBE(length: number): number {
    return this.read(length).readIntBE(0, length);
  }

  public readU8(): number {
    return this.readUIntBE(1);
  }

  public readU16BE(): number {
    return this.readUIntBE(2);
  }

  public readU24BE(): number {
    return this.readUIntBE(3);
  }

  public readU32BE(): number {
    return this.readUIntBE(4);
  }

  public readI16BE(): number {
    return this.readIntBE(2);
  }

  public readF64BE(): number {
    return this.read(8).readDoubleBE(0);
  }
}

const UTF8Decoder = new TextDecoder('utf-8');
const scriptend: unique symbol = Symbol();

const read_string = (reader: ByteReader): string => {
  const length = reader.readU16BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_longstring = (reader: ByteReader): string => {
  const length = reader.readU32BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_object = (reader: ByteReader): Record<string, any> => {
  const object: Record<string, any> = {};
  while (true) {
    const name = read_string(reader);
    const val = read_value(reader);
    if (val === scriptend) { return object; }
    object[name] = val;
  }
}

const read_mixedarray = (reader: ByteReader): Record<string, any>  => {
  reader.readU32BE(); // length
  return read_object(reader);
}

const read_strictarray = (reader: ByteReader): any[] => {
  const length = reader.readU32BE();
  const array = [];
  for (let i = 0; i < length; i++) {
    array.push(read_value(reader));
  }
  return array;
}

const read_date = (reader: ByteReader): Date => {
  const timestamp = reader.readF64BE();
  const localtimeoffset = reader.readI16BE();
  return new Date(timestamp);
}

const read_value = (reader: ByteReader): any => {
  const tag = reader.readU8();
  switch (tag) {
    case 0: return reader.readF64BE();
    case 1: return reader.readU8() !== 0;
    case 2: return read_string(reader);
    case 3: return read_object(reader);
    case 4: throw new Error('Unsupported Tag: 4 (movie clip)');
    case 5: return null;
    case 6: return undefined;
    case 7: throw new Error('Unsupported Tag: 7 (reference)');
    case 8: return read_mixedarray(reader);
    case 9: return scriptend;
    case 10: return read_strictarray(reader);
    case 11: return read_date(reader);
    case 12: return read_longstring(reader);
  }
  throw new Error(`Invalid tag: ${tag}`);
}

const read_amf0 = (data: Buffer): any[] => {
  const reader = new ByteReader(data);
  const result = [];
  while (!reader.isEOF()) {
    result.push(read_value(reader));
  }
  return result;
}

const UTF8Encoder = new TextEncoder();

const write_number = (data: number): Buffer => {
  const buffer = Buffer.alloc(8)
  buffer.writeDoubleBE(data);
  return buffer;
}

const write_string = (data: string): Buffer => {
  const utf8 = Buffer.from(UTF8Encoder.encode(data));
  const length = Buffer.alloc(2);
  length.writeUInt16BE(utf8.byteLength);
  return Buffer.concat([length, utf8]);
}

const write_object = (obj: Record<string, any>): Buffer => {
  return Buffer.concat([... Object.entries(obj).map(([k, v]) => Buffer.concat([write_string(k), write_value(v)])), Buffer.from([0x00, 0x00, 0x09])]);
}

const write_array = (data: any[]): Buffer => {
  const length = Buffer.alloc(4);
  length.writeUInt32BE(data.length);
  return Buffer.concat([length, ... data.map((datum) => write_value(datum))]);
}

const write_date = (data: Date): Buffer => {
  const buffer = Buffer.alloc(10);
  buffer.writeDoubleBE(data.getTime(), 0);
  buffer.writeUInt16BE(0, 8);
  return buffer;
}

const write_value = (data: any): Buffer => {
  if (data === null) { return Buffer.from([0x05]); }
  if (data === undefined) { return Buffer.from([0x06]); }
  if (Array.isArray(data)) { return Buffer.concat([Buffer.from([0x0a]), write_array(data)]); }
  if (data instanceof Date) { return Buffer.concat([Buffer.from([0x0b]), write_date(data)]); }
  switch (typeof data) {
    case 'number': return Buffer.concat([Buffer.from([0x00]), write_number(data)]);
    case 'boolean': return Buffer.from([0x01, data ? 1 : 0]);
    case 'string': return  Buffer.concat([Buffer.from([0x02]), write_string(data)]);
    case 'object': return Buffer.concat([Buffer.from([0x03]), write_object(data)]);
    default: return Buffer.from([]);
  }
}

const write_amf0 = (... data: any[]): Buffer => {
  return Buffer.concat(data.map((datum) => write_value(datum)));
}

type MessageInformation = {
  message_type_id: number;
  message_stream_id: number
  message_length: number;
  timestamp: number;
  timestamp_delta: number | null;
  is_extended_timestamp: boolean;
}

const MessageType = {
  SetChunkSize: 1,
  Abort: 2,
  Acknowledgement: 3,
  UserControl: 4,
  WindowAcknowledgementSize: 5,
  SetPeerBandwidth: 6,
  Audio: 8,
  Video: 9,
  DataAMF3: 15,
  CommandAMF3: 17,
  DataAMF0: 18,
  CommandAMF0: 20,
} as const;

type Message = Omit<MessageInformation, 'timestamp_delta' | 'is_extended_timestamp'> & {
  data: Buffer;
}

async function* read_message(reader: AsyncByteReader): AsyncIterable<Message> {
  let chunk_maximum_size = 128; // システムメッセージにより変化する
  const informations = new Map<number, MessageInformation>();
  const chunks = new Map<number, Buffer[]>();
  const lengths = new Map<number, number>();

  while (true) {
    const basic = await reader.readU8();
    const fmt = (basic & 0b11000000) >> 6;
    let cs_id = (basic & 0b00111111) >> 0;
    switch (cs_id) {
      case 0: cs_id = 64 + await reader.readU8(); break;
      case 1: cs_id = 64 + await reader.readU16LE(); break;
    }
    if (fmt !== 3) { chunks.set(cs_id, []); lengths.set(cs_id, 0); }
    const chunk_buffers = chunks.get(cs_id) ?? [];
    const chunk_length = lengths.get(cs_id) ?? 0;
    let information = informations.get(cs_id);

    let timestamp = fmt !== 3 ? await reader.readU24BE() : information?.timestamp;
    const message_length = fmt === 0 || fmt === 1 ? await reader.readU24BE() : information?.message_length;
    const message_type_id = fmt === 0 || fmt === 1 ? await reader.readU8() : information?.message_type_id;
    const message_stream_id = fmt === 0 ? await reader.readU32LE() : information?.message_stream_id;
    // 情報がない時は何もできないしパースもできない。今回は簡単のため受信をやめて return する
    const is_extended_timestamp = fmt !== 3 ? timestamp === 0xFFFFFF : information?.is_extended_timestamp;
    if (timestamp == null || message_length == null || message_type_id == null || message_stream_id == null || is_extended_timestamp == null) { return; }
    const extended_timestamp = is_extended_timestamp ? await reader.readU32BE() : null;

    let timestamp_delta = null;
    if (fmt === 1 || fmt === 2) {
      if (information?.timestamp == null) { return; }
      timestamp_delta = extended_timestamp ?? timestamp;
      timestamp = information.timestamp + timestamp_delta;
    } else if (fmt === 3) {
      timestamp_delta = extended_timestamp ?? information?.timestamp_delta ?? timestamp;
      timestamp += (chunk_length > 0) ? 0 : timestamp_delta;
    } else {
      timestamp = extended_timestamp ?? timestamp;
    }

    information = { message_type_id, message_stream_id, message_length, timestamp, timestamp_delta, is_extended_timestamp } satisfies MessageInformation;
    const chunk = await reader.read(Math.min(message_length - chunk_length, chunk_maximum_size));
    const length = chunk_length + chunk.byteLength;
    if (length >= message_length) {
      if (length === message_length) {
        const data = Buffer.concat([... chunk_buffers, chunk])
        if (message_type_id === MessageType.SetChunkSize) {
          chunk_maximum_size = Math.max(1, Math.min(2 ** 31 - 1, data.readUInt32BE(0)));
        } else {
          const { timestamp_delta, is_extended_timestamp, ... message } = information;
          yield { ... message, data };
        }
      }
      chunks.set(cs_id, []);
      lengths.set(cs_id, 0);
    } else {
      chunks.set(cs_id, [... chunk_buffers, chunk]);
      lengths.set(cs_id, length);
    }
    informations.set(cs_id, information);
  }
}

const write_message = (message: Omit<Message, 'message_length'>): Buffer => {
  let chunk_maximum_size = 128;
  const result: Buffer[] = [];

  for (let i = 0; i < message.data.byteLength; i += chunk_maximum_size) {
    const chunk = message.data.subarray(i, Math.min(message.data.byteLength, i + chunk_maximum_size));
    const cs_id = 3; // こちらからはシステム系のメッセージしか送らないので 3 に固定する
    const fmt = i === 0 ? 0 : 3; // 簡単化のため 0 か 3 以外は使わない
    const basic = Buffer.from([(fmt << 6) | cs_id]);

    if (fmt === 3) {
      result.push(basic, chunk);
      continue;
    }

    const header = Buffer.alloc(message.timestamp >= 0xFFFFFF ? 15 : 11);
    header.writeUIntBE(Math.min(message.timestamp, 0xFFFFFF), 0, 3);
    header.writeUIntBE(message.data.byteLength, 3, 3);
    header.writeUIntBE(message.message_type_id, 6, 1);
    header.writeUIntLE(message.message_stream_id, 7, 4);
    if (message.timestamp >= 0xFFFFFF) {
      header.writeUIntBE(message.timestamp, 11, 4);
    }

    result.push(basic, header, chunk);
  }

  return Buffer.concat(result);
}

type BandwidthEstimatorOption = {
  intervalMillis: number;
  movingAverageLength: number;
}
const BandwidthEstimatorOption = {
  from(option?: Partial<BandwidthEstimatorOption>): BandwidthEstimatorOption {
    return { intervalMillis: 100, movingAverageLength: 10, ... option };
  }
}

class BandwidthEstimator {
  private option: BandwidthEstimatorOption;
  private controller: AbortController;
  private limit: number;
  private timerId: NodeJS.Timeout | null = null;
  private readonly measureHandler = this.measure.bind(this);
  private estimates: number[] = [];
  private movingAverageLength: number;
  private totalBytes: number = 0;
  private previousTime: DOMHighResTimeStamp = performance.now();

  public constructor(limit: number, controller: AbortController, option?: Partial<BandwidthEstimatorOption>) {
    this.limit = limit;
    this.controller = controller;
    this.option = BandwidthEstimatorOption.from(option);
    this.movingAverageLength = this.option.movingAverageLength;
    this.timerId = setInterval(this.measureHandler, this.option.intervalMillis);
  }

  public feed(byteLength: number) {
    this.totalBytes += byteLength;
  }

  public estimate(): number {
    return this.estimates.reduce((sum, curr) => sum + curr, 0) / this.movingAverageLength;
  }

  public measure(): void {
    const currentTime = performance.now();
    const duration = (currentTime - this.previousTime) / 1000;
    const bandwidth = (this.totalBytes * 8) / duration;
    this.estimates.push(bandwidth);

    if (this.estimates.length >= this.movingAverageLength) {
      this.estimates.splice(0, this.estimates.length - this.movingAverageLength);
      const average = this.estimate();
      if (average >= this.limit) {
        this.controller.abort(new Error(`Bandwidth quota exceeded! limit: ${this.limit}, actual: ${average}`));
      }
    }

    this.previousTime = currentTime;
    this.totalBytes = 0;
  }

  public destroy(): void {
    if (this.timerId == null) { return; }
    clearInterval(this.timerId);
    this.timerId = null;
  }
}

const options = {
  rtmp: {
    type: 'string',
    default: '1935',
  },
  web: {
    type: 'string',
    default: '8000',
  },
  app: {
    type: 'string'
  },
  streamKey: {
    type: 'string'
  },
  highWaterMark: {
    type: 'string'
  },
  bandwidth: {
    type: 'string'
  }
} as const ;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.rtmp, 10))) {
  console.error('Please Specify valid port number'); process.exit(-1);
}
if (Number.isNaN(Number.parseInt(args.web, 10))) {
  console.error('Please Specify valid port number'); process.exit(-1);
}
if (args.app == null) {
  console.error('Please Specify valid app'); process.exit(-1);
}
if (args.streamKey == null) {
  console.error('Please Specify valid streamKey'); process.exit(-1);
}
if (args.highWaterMark != null && Number.isNaN(Number.parseInt(args.highWaterMark, 10))) {
  console.error('Please Specify valid highwatermark'); process.exit(-1);
}
if (args.bandwidth != null && Number.isNaN(Number.parseInt(args.bandwidth, 10))) {
  console.error('Please Specify valid bandwidth'); process.exit(-1);
}
const port = Number.parseInt(args.rtmp, 10);
const web = Number.parseInt(args.web, 10);
const app = args.app;
const streamKey = args.streamKey;
const highWaterMark = args.highWaterMark != null ? Number.parseInt(args.highWaterMark, 10) : undefined;
const bandwidth = args.bandwidth != null ? Number.parseInt(args.bandwidth, 10) : undefined;

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
} as const;

let lock = false;
async function* handle_rtmp(connection: Duplex, app: string, key: string, limit?: number): AsyncIterable<Message> {
  let locking = false;
  const bandwidthController = new AbortController();
  const reader = new AsyncByteReader();
  const estimator = new BandwidthEstimator(limit ?? Number.POSITIVE_INFINITY, bandwidthController);
  connection.pipe(new Writable({
    write(data, _, cb) { reader.feed(data); estimator.feed(data.byteLength); cb(); },
    destroy(err, cb) { reader.feedEOF(); cb(err); }
  }));

  try {
    /*
    * RTMPのハンドシェイクを処理する
    */
    {
      // C0/S0
      await reader.readU8(); // Read C0
      connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
      // C1/S1
      const c1 = await reader.read(1536); // read C1
      const c1_random = c1.subarray(8);
      const s1_random = randomBytes(1536 - 8);
      const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
      connection.write(s1); // write C1
      // C2/S2
      const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
      connection.write(s2); // write S2
      const c2 = await reader.read(1536); // read C2
      const c2_random_echo = c2.subarray(8);
      // Check Random Echo
      if (s1_random.byteLength !== c2_random_echo.byteLength || !s1_random.equals(c2_random_echo)) {
        throw new Error('Invalid Random Echo');
      }
    }
    /*
    * RTMPのメッセージを処理する
    */
    let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
    for await (const message of read_message(reader)) {
      if (bandwidthController.signal.aborted) { throw new Error('Bandwidth quota exceeded!', { cause: bandwidthController.signal.reason }); }

      // 本当は共通で処理するシステムメッセージ部分を書く、大体の打ち上げRTMPではなくても良いので省略

      switch (state) {
        case STATE.WAITING_CONNECT: {
          if (message.message_stream_id !== 0) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'connect') { continue; }
          const transaction_id = command[1];
          const appName = command[2]['app'];
          const connectAccepted = appName === app;

          const info = connectAccepted ? {
            code: 'NetConnection.Connect.Success',
            description: 'Connection succeeded.',
            data: { version: '3,5,7,7009', },
            objectEncoding: 0, // 0 = AMF0, 3 = AMF3
            level: 'status', // 正常系
          } : {
            code: 'NetConnection.Connect.Rejected',
            description: 'Connection rejected.',
            level: 'error', // 異常系
          };

          const result = write_amf0(
            '_result',
            transaction_id,
            {
              fmsVer: 'FMS/3,5,7,7009',
              capabilities: 31,
              mode: 1,
            },
            info
          );
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: 0,
            timestamp: 0,
            data: result,
          }));

          if (!connectAccepted) { return; }
          state = STATE.WAITING_CREATESTREAM;
          break;
        }
        case STATE.WAITING_CREATESTREAM: {
          if (message.message_stream_id !== 0) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'createStream') { continue; }
          const transaction_id = command[1];

           // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
          const result = write_amf0('_result', transaction_id, null, 1);
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: 0,
            timestamp: 0,
            data: result,
          }));

          state = STATE.WAITING_PUBLISH;
          break;
        }
        case STATE.WAITING_PUBLISH: {
          if (message.message_stream_id !== 1) { continue; }
          if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
          const command = read_amf0(message.data);

          const name = command[0];
          if (name !== 'publish') { continue; }
          const transaction_id = command[1];
          const streamKey = command[3];
          const publishAccepted = streamKey === key && !lock; // streamKey が合致していて、配信されてない場合は配信を許可する

          const info = publishAccepted ? {
            code: 'NetStream.Publish.Start',
            description: 'Publish Accepted',
            level: 'status' // 正常系
          } : {
            code: 'NetStream.Publish.Failed', // Permision Denied
            description: 'Publish Failed',
            level: 'error' // 異常系
          };

          const result = write_amf0('onStatus', transaction_id, null, info);
          connection.write(write_message({
            message_type_id: MessageType.CommandAMF0,
            message_stream_id: message.message_stream_id,
            timestamp: 0,
            data: result,
          }));

          if (!publishAccepted) { return; }
          locking = lock = true;
          state = STATE.PUBLISHED;
          break;
        }
        case STATE.PUBLISHED: {
          yield message;
          break;
        }
      }
    }
  } catch (e) {
    throw e;
  } finally {
    if (locking) { lock = false; }
    connection.destroy();
    estimator.destroy();
    reader.feedEOF();
  }
}

const write_tag_header = (message: Message, previousTagSize: number): Buffer => {
  const header = Buffer.alloc(15);
  header.writeUIntBE(previousTagSize, 0, 4);
  header.writeUIntBE(message.message_type_id, 4, 1)
  header.writeUIntBE(message.data.byteLength, 5, 3);
  header.writeUInt8(Math.floor(message.timestamp / (2 ** 16)) % (2 ** 8),  8);
  header.writeUInt8(Math.floor(message.timestamp / (2 **  8)) % (2 ** 8),  9);
  header.writeUInt8(Math.floor(message.timestamp / (2 **  0)) % (2 ** 8), 10);
  header.writeUInt8(Math.floor(message.timestamp / (2 ** 24)) % (2 ** 8), 11);
  header.writeUIntBE(0, 12, 3);
  return header;
}

type StreamingInformation = [writeFn: (buffer: Buffer) => void, exitFn: () => void, priviousTagSize: number, initialized: boolean]
const streaming = new Map<number, StreamingInformation>();
const handle = async (connection: Duplex) => {
  let onMetadataMessage: Message | null = null;
  let avcConfigMessage: Message | null = null;
  let aacConfigMessage: Message | null = null;

  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      const reader = new ByteReader(message.data);
      switch (message.message_type_id) {
        case MessageType.Video: {
          const codec = reader.readU8() & 0x0F;
          if (codec !== 0x07) { continue; } // Accept AVC
          const packetType = reader.readU8();
          if (packetType === 0) { avcConfigMessage = message; }
          break;
        }
        case MessageType.Audio: {
          const codec = reader.readU8() >> 4;
          if (codec !== 10) { continue; } // Accept AAC
          const packetType = reader.readU8();
          if (packetType === 0) { aacConfigMessage = message; }
          break;
        }
        case MessageType.DataAMF0: {
          const command = read_amf0(message.data);
          if (command.length !== 3) { continue; }
          if (command[0] !== '@setDataFrame') { continue; }
          if (command[1] !== 'onMetaData') { continue; }
          onMetadataMessage ={ ... message, data: write_amf0(command[1], command[2]) };
          break;
        }
        default: continue;
      }

      for (const stream of streaming.values()){
        const [write,,, initialize] = stream;
        if (initialize) {
          for (const message of [onMetadataMessage, avcConfigMessage, aacConfigMessage]) {
            if (message == null) { continue; }
            const previousTagSize = stream[2];
            const header = write_tag_header(message, previousTagSize)
            write(header);
            write(message.data);
            stream[2] = (header.byteLength + message.data.byteLength) - 4;
          }
        }
        {
          const previousTagSize = stream[2];
          const header = write_tag_header(message, previousTagSize)
          write(header);
          write(message.data);
          stream[2] = (header.byteLength + message.data.byteLength) - 4;
        }
        stream[3] = false;
      };
    }
  } catch (e) {
    console.error(e);
  } finally {
    for (const [,end] of streaming.values()) { end(); }
  }
}

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

let viewers = 0;
const web_server = http.createServer({ highWaterMark }, (req, res) => {
  if (req.url == null) {
    res.writeHead(404, { "access-control-allow-origin": '*' });
    res.end();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname === `/${app}/${streamKey}`)) {
    res.writeHead(404, { "access-control-allow-origin": '*' });
    res.end();
    return;
  }
  const viewer = viewers++;
  res.writeHead(200, {
    "content-type": 'application/octet-stream',
    "access-control-allow-origin": '*',
  });

  const write = (chunk: Buffer) => {
    if (res.closed) { return; }
    if (!res.write(chunk) && highWaterMark != null) { res.destroy(); }
  };
  write(Buffer.from([
    0x46, 0x4C, 0x56, // Signature (FLV)
    1,               // version
    4 | 1,           // 4: Audio Present, 1: Video Present
    0, 0, 0, 9       // Header Bytes
  ]));

  const exit = () => { res.end(); streaming.delete(viewer); }
  const entry = [write, exit, 0, true] satisfies StreamingInformation;
  streaming.set(viewer, entry);

  req.on('close', exit);
  res.on('close', exit);
  req.on('error', exit);
  res.on('error', exit);
});
web_server.listen({ port: web });