PLAY DEVELOPERS BLOG

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

動画配信技術を実装して理解する〜HLS配信編〜

こんにちは、PLAY CLOUD本部 技術推進室 の石川です。「動画配信技術を実装して理解する」シリーズも4回目の記事となりました。 前回の記事はこちらになります。

developers.play.jp

今回は、前回のRTMPからHTTP-TS配信を行うプログラムを、インターネット配信のデファクトスタンダードであるHLSに変更するという部分を取り扱います。 HLS化は前回のMPEG-TSへのトランスマックスができてしまえばサクッとできますし、HLSで配信できると感動もひとしおです。 実際にデファクトスタンダードの配信プロトコルで配信する部分まで自作してしまいましょう。

では、RTMPをHLSにして再生するまでを実際に実装して理解してみましょう。

HLSとは?

HLS (HTTP Live Streaming) は Apple が策定しているストリーミング配信プロトコルです。 動画をぶつ切りにしたセグメントのリンクをメディアプレイリストというリンク集に載せて配信します。 言葉で言うと単純ですが、実装も単純なため様々なOVP(Online Video Platform)でサポートされ、様々なクライアントで再生できます。

HLSは動画配信における現状のデファクトスタンダードと言っても過言ではありません。 これは先ほど挙げた配信プロトコルとして非常に単純である点の他に、Apple系デバイスではHLSが標準という点が大きいです。 例えば、iOS 17.2 以降のSafariでは ManagedMediaSource という形でMSEが使えますが、それ以前ではHLS以外の選択肢はありませんでした。

では、HLSで出てくる要素、セグメント、メディアプレイリスト、マルチバリアントプレイリストについて説明します。

セグメント

HLSのセグメントは、HLSの動画データを構成する単位です。
動画コンテナは MPEG-TS または fMP4、もしくは Packed Audio になります。

セグメント自体は動画をぶつ切りにしたものと考えてよく、概ね間違っていません。 元々は、どこで切っても問題ない MPEG-TS を用いていましたが、fmp4もCMAF対応の一環でサポートされています。 MPEG-TS は動画をぶつ切りにする手間が少なく、放送用途のどこで切っても良いフォーマットなので、HLSでは都合が良かったのでしょう。

現状、古い機器との互換性などの問題でHLSではMPEG-TSを使い、MPEG-DASHでCMAF(fmp4)を用いるという使い分けが多いです。 このため、HLSではMPEG-TSを見かけることが圧倒的に多く、fmp4を見ることはすこし珍しいという状態になっています。 ただし、HLSでも新しいコーデック(HEVCやAV1, FLACなど)の対応や、HLSでのマルチDRMのためにfmp4を用いる場合があります。

メディアプレイリスト

HLSのメディアプレイリストは配信する単一の動画を管理する単位です。 平たくいうと、動画全体のセグメントのリンクが並んでいて、それぞれにメタ情報が書いてあります。 プレイヤーはこのリンクからセグメントをHTTPで取得して再生します。

メディアプレイリストの例

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:4.004,
https://www.example.com/manifest_1_1.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_2.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_3.ts

これらの詳しい内容は、テックブログの以下の記事を参照してください。

developers.play.jp

マルチバリアントプレイリスト

マルチバリアントプレイリストは、HLSの複数のメディアプレイリストをまとめたプレイリストです。 HLSの配信におけるエントリーとして使うもので、コーデックや解像度の変更、音声切り替えなどの複数のメディアを繋げるために用います。 プレイヤーはこの中にあるメディアプレイリストの中から選んで再生します。

マルチバリアントプレイリストの例

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=1091244
https://www.example.com/manifest_1.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=2382397
https://www.example.com/manifest_2.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=3859904
https://www.example.com/manifest_3.m3u8

マルチバリアントプレイリストを省略してメディアプレイリスト単体で再生することも可能であり、マルチバリアントプレイリストは必須というわけではありません。

HLSによるライブ配信

HLSでライブ配信を行う際には、セグメントをオリジンに配置した後、メディアプレイリストにセグメントを追記します。 ただし、無限に長くするわけにもいかないため、ライブ配信であることをメディアプレイリストで宣言して消し込みをします。

厳密にいうと #EXT-X-PLAYLIST-TYPE を設定しなければ、HLSではライブ配信になります。 #EXT-X-PLAYLIST-TYPEの指定値と配信形式の対応を以下に示します。 例えば、EVENT では確実に先頭にシークできますが、ライブの場合はどこまで戻れるかは保証されていません。

  • #EXT-X-PLAYLIST-TYPE の設定値
    • 未指定: ライブ (スライディングウィンドウ)
    • EVENT: DVR (追いかけ再生、伸びるだけ)
    • VOD: VOD (固定)

ライブのメディアプレイリストでは、大体3~4個のセグメントを記載します。

例えば、ある時点でのメディアプレイリストが以下のようだったとしましょう。

ある時刻でのメディアプレイリスト

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:4.004,
https://www.example.com/manifest_1_1.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_2.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_3.ts

次の時点でのメディアプレイリストはこのように変化します。

次の時点でのメディアプレイリスト

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:2
#EXTINF:4.004,
https://www.example.com/manifest_1_2.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_3.ts
#EXTINF:4.004,
https://www.example.com/manifest_1_4.ts

次の時点になると、4番目のセグメント(https://www.example.com/manifest_1_4.ts)が追加されます。これがライブ配信するための仕組みになります。 また、消し込みにより、先頭の番号(#EXT-X-MEDIA-SEQUENCE)がインクリメントされ、以前先頭にあった1番目のセグメント(https://www.example.com/manifest_1_1.ts)は削除されています。 このように、追記/消し込みの処理により、メディアプレイリストは一定の数(ウィンドウ)をスライドさせるようにセグメントを記載していきます。

RTMPからHLSへの変換の流れ

今回はRTMPで動画データを受け、HLSに変換して配信する配信サーバを作ります。 すなわち、RTMPから動画データ(FLV)を取り出し、MPEG-TSのコンテナに変換して、HLSへパッケージングします。

このため、以下のフローで実装を行います。

  1. RTMPを受信する (1回目の記事 RTMP 1.0 受信編 を参照)
  2. RTMPの中身のFLVを取り出す (2回目の記事 HTTP-FLV 配信編 を参照)
  3. FLVからMPEG-TSに変換する (3回目の記事 MPEG-TS トランスマックス編 を参照)
  4. MPEG-TSをHLSにパッケージングする (NEW!)
  5. HLSをHTTPで配信する (NEW!)

MPEG-TSをHLSにパッケージングする

セグメントの実装

セグメントはメディアデータとメタデータで構成されます。 今回は、開始タイムスタンプと終了タイムスタンプで尺長の情報を保持し、動画データも一緒に管理します。

ただし、一般的にはパッケージャー(プレイリスト作成担当)とオリジン(セグメント置き場)は分けます。 また、メディアデータではなく、メディアデータを置いているオリジンのリンクを保持するようにして、コンポーネントを疎結合にします。 つまるところ、動画データとプレイリストはブロックストレージ(S3など)にアップロードしてしまうため、そのリンクだけで十分だということです。

ですが、今回は簡単のためパッケージャーがオリジンとして振る舞う形にして、さらに動画データとメタデータをセットで管理します。

上記で説明したセグメントの構造は、以下のように実装してみました。

segment.mts

import { Writable } from 'node:stream';

export default class Segment {
  private beginTimestamp: number;
  private endTimestamp: number | null = null;
  private media: Buffer[] = [];

  public constructor(timestamp: number) {
    this.beginTimestamp = timestamp;
  }

  public begin(): number {
    return this.beginTimestamp;
  }

  public extinf(): number | null {
    if (this.endTimestamp == null) { return null; }
    return this.endTimestamp - this.beginTimestamp;
  }

  public write(writable: Writable): void {
    if (this.extinf() != null) {
      for (const data of this.media) {
        writable.write(data);
      }
    }

    writable.end();
    return;
  }

  public feed(data: Buffer): void {
    this.media.push(data);
  }

  public complete(timestamp: number): void {
    if (this.extinf() != null) { return; }
    this.endTimestamp = timestamp;
  }
}

メディアプレイリストの実装

メディアプレイリストはセグメントの一覧と、先頭のセグメントの番号(SequenceNumber)を管理します。 メディアプレイリストの内容は、ヘッダーの定型文と保持しているセグメントの一覧を記載するだけのシンプルなものです。 今回は、セグメントのURLを ${セグメントの番号}.ts と決めうちします。

また、メディアプレイリストがセグメントの一覧を管理するため、メディアプレイリストからの消し込み処理を実装します。 ここで注意しなければならない点として、消し込み処理とクライアントのセグメント取得が競合する場合を考慮して実装する必要があります。

どういうことかと言いますと、メディアプレイリストを取得した後にセグメントを取得するため、メディアプレイリストを取得した時点ではセグメントの一覧に存在するが、 セグメントを取得する時にはメディアプレイリストから削除されているケースが発生します。 これは通常ケースでもタイミング次第で発生するため、この時に404(NotFound)を返してしまうと再生障害になってしまいます。

このため、今回は削除を猶予するセグメント数を決め、その分はマニフェストには記載されていないが取得可能という猶予を設けて対処します。

上記で説明したメディアプレイリストの構造は、以下のように実装してみました。

media-playlist.mts

import { Writable } from 'node:stream';

import Segment from './segment.mts';

const MINIMUM_LIVE_WINDOW_LENGTH = 3;

export type MediaPlaylistOption = {
  liveWindowLength: number;
  orphanedWindowLength: number;
  minimumSegmentDuration: number;
};

export const MediaPlaylistOption = {
  from(option?: Partial<MediaPlaylistOption>): MediaPlaylistOption {
    return {
      ... option,
      liveWindowLength: Math.max(option?.liveWindowLength ?? MINIMUM_LIVE_WINDOW_LENGTH, MINIMUM_LIVE_WINDOW_LENGTH),
      orphanedWindowLength: Math.max(option?.orphanedWindowLength ?? MINIMUM_LIVE_WINDOW_LENGTH, MINIMUM_LIVE_WINDOW_LENGTH),
      minimumSegmentDuration: Math.max(0, option?.minimumSegmentDuration ?? 0),
    };
  },
};

export default class MediaPlaylist {
  public published: Promise<boolean>;
  private publishedNotify: (success: boolean) => void;

  private sequenceNumber: number = -1;
  private orphanedNumber: number;

  private liveWindowLength: number;
  private orphanedWindowLength: number;

  private currentSegment: Segment | null = null;
  private segmentMap: Map<number, Segment> = new Map<number, Segment>();
  private orphanedMap: Map<number, Segment> = new Map<number, Segment>();

  private minimumSegmentDuration: number;
  private targetduration: number | null = null;

  public constructor(opt?: Partial<MediaPlaylistOption>) {
    const option = MediaPlaylistOption.from(opt);
    this.liveWindowLength = option.liveWindowLength;
    this.orphanedWindowLength = option.orphanedWindowLength;
    this.orphanedNumber = this.sequenceNumber - option.liveWindowLength;
    this.minimumSegmentDuration = option.minimumSegmentDuration;

    const { promise: published, resolve: publishedNotify } = Promise.withResolvers<boolean>();
    this.published = published;
    this.publishedNotify = publishedNotify;
  }

  public append(timestamp: number): void {
    if (this.currentSegment != null && (timestamp - this.currentSegment.begin()) < this.minimumSegmentDuration) {
      return;
    }

    this.currentSegment?.complete(timestamp);
    if (this.currentSegment) {
      this.segmentMap.set(this.sequenceNumber, this.currentSegment);
    }
    if (this.currentSegment?.extinf() != null && this.targetduration == null) {
      this.targetduration = Math.ceil(this.currentSegment.extinf()!);
    }
    if (this.sequenceNumber - this.liveWindowLength >= 0) {
      const sequenceNumber = this.sequenceNumber - this.liveWindowLength;
      const segment = this.segmentMap.get(sequenceNumber);

      if (segment) {
        this.orphanedMap.set(sequenceNumber, segment);
        this.segmentMap.delete(sequenceNumber);
      }
    }
    if (this.orphanedNumber - this.orphanedWindowLength >= 0) {
      const sequenceNumber = this.orphanedNumber - this.orphanedWindowLength;
      this.orphanedMap.delete(sequenceNumber);
    }

    this.sequenceNumber += 1;
    this.orphanedNumber += 1;
    this.currentSegment = new Segment(timestamp);

    if (this.segmentMap.size >= MINIMUM_LIVE_WINDOW_LENGTH) {
      this.publishedNotify(true);
    }
  }

  public feed(data: Iterable<Buffer>): void {
    for (const buffer of data) { this.currentSegment?.feed(buffer); }
  }

  public m3u8(): string {
    let m3u8 = '';
    m3u8 += '#EXTM3U\n';
    m3u8 += '#EXT-X-VERSION:3\n';
    m3u8 += `#EXT-X-MEDIA-SEQUENCE:${Math.max(0, this.sequenceNumber - this.liveWindowLength)}\n`;
    m3u8 += `#EXT-X-TARGETDURATION:${this.targetduration}\n`;
    m3u8 += '\n';
    for (let i = Math.max(0, this.sequenceNumber - this.liveWindowLength); i < this.sequenceNumber; i++) {
      const segment = this.segmentMap.get(i)!;
      m3u8 += `#EXTINF:${segment.extinf()!.toFixed(6)}\n`;
      m3u8 += `${i}.ts\n`;
    }

    return m3u8;
  }

  public segment(msn: number, writable: Writable, cb?: (found: boolean) => void): void {
    const segment = this.sequenceNumber === msn ? this.currentSegment : this.segmentMap.get(msn) ?? this.orphanedMap.get(msn);
    if (segment == null) {
      cb?.(false);
      return;
    }

    cb?.(true);
    segment.write(writable);
  }
}

HLSへのパッケージングの実装

RTMP(FLV)をMPEG-TSに変換するついでにHLSにパッケージングします。 今回も、前回のFLVからMPEG-TSへのトランスマックスと同じように、簡単のため固定のPIDの構成にします。

前回と違う点は、キーフレームを検知してセグメントを分割する処理が入ることです。 ビデオのフレーム種別がキーフレームかどうかをFLVのメタ情報から判断し、キーフレームだったら新しいセグメントを作ります。 ただし、一定間隔以内のキーフレームはセグメント長を担保するために無視するようにしています。

それ以外は、FLVをMPEG-TSに変換する時と同じになります。

上記で説明したパッケージャーの構造は、以下のように実装してみました。

hls-generator.mts

import { Writable } from 'node:stream';

import type { Message } from './message-reader.mts';

import { read_avc_decoder_configuration_record, write_annexb_avc } from './src/avc.mts';
import type { AVCDecoderConfigurationRecord } from './avc.mts';
import { read_audio_specific_config, write_adts_aac } from './aac.mts';
import type { AudioSpecificConfig } from './aac.mts';

import handle_rtmp_payload, { FrameType } from './rtmp-handler.mts';
import { SectionPacketizer, PESPacketizer, PCRPacketizer, write_pat, write_pmt, write_pes, StreamType } from './mpegts.mts';
import type { PAT, PMT } from './mpegts.mts';

import MediaPlaylist from './media-playlist.mts';

const PMT_PID = 256;
const PCR_PID = 257;
const AVC_PID = 258;
const AAC_PID = 259;
const PAT_DATA = {
  transport_stream_id: 0,
  programs: [{
    program_number: 1,
    program_map_PID: PMT_PID,
  }],
} as const satisfies PAT;
const PMT_DATA = {
  program_number: 1,
  pcr_pid: PCR_PID,
  streams: [{
    stream_type: StreamType.AVC,
    elementary_PID: AVC_PID,
  }, {
    stream_type: StreamType.AAC_ADTS,
    elementary_PID: AAC_PID,
  }],
} as const satisfies PMT;
const emit_PSI_interval = 100;

const timestamp_from_rtmp_to_mpegts = (timestamp: number): number => {
  return timestamp * 90;
};

const timestamp_from_rtmp_to_hls = (timestamp: number): number => {
  return timestamp / 1000;
};

export default class HLSGenerator {
  private avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord | null = null;
  private audioSpecificConfig: AudioSpecificConfig | null = null;
  private patPacketizer = new SectionPacketizer(0);
  private pmtPacketizer = new SectionPacketizer(PMT_PID);
  private pcrPacketizer = new PCRPacketizer(PCR_PID);
  private avcPacketizer = new PESPacketizer(AVC_PID);
  private aacPacketizer = new PESPacketizer(AAC_PID);
  private last_emit_PSI_timestamp: number | null = null;

  private manifest: MediaPlaylist;

  public constructor(liveWindowLength: number = 3) {
    this.manifest = new MediaPlaylist({ liveWindowLength });
  }

  public published(): Promise<boolean> {
    return this.manifest.published;
  }

  public segment(msn: number, writable: Writable, cb?: (found: boolean) => void): void {
    return this.manifest.segment(msn, writable, cb);
  }

  public m3u8(): string {
    return this.manifest.m3u8();
  }

  public feed(message: Message): void {
    if (this.last_emit_PSI_timestamp != null && (message.timestamp - this.last_emit_PSI_timestamp) >= emit_PSI_interval) {
      this.manifest.feed([
        ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
        ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
        this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(message.timestamp)),
      ]);
      this.last_emit_PSI_timestamp = message.timestamp;
    }

    const payload = handle_rtmp_payload(message);
    if (payload == null) { return; }

    switch (payload.kind) {
      case 'Video':
        if (payload.codec !== 'AVC') { return; }
        if (payload.packetType === 0) {
          this.avcDecoderConfigurationRecord = read_avc_decoder_configuration_record(payload.avcDecoderConfigurationRecord);
          return;
        }
        if (payload.type === FrameType.KEY_FRAME) {
          this.manifest.append(timestamp_from_rtmp_to_hls(payload.timestamp));
          this.manifest.feed([
            ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
            ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
            this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(payload.timestamp)),
          ]);
          this.last_emit_PSI_timestamp = payload.timestamp;
        }
        break;
      case 'Audio':
        if (payload.codec !== 'AAC') { return; }
        if (payload.packetType === 0) {
          this.audioSpecificConfig = read_audio_specific_config(payload.audioSpecificConfig);
          return;
        }
        break;
      default:
        return;
    }

    switch (payload.kind) {
      case 'Video':
        if (this.avcDecoderConfigurationRecord == null) { return; }
        this.manifest.feed(this.avcPacketizer.packetize(
          write_pes(
            write_annexb_avc(payload.data, this.avcDecoderConfigurationRecord),
            0xe0, // 0b1110XXXX: H.262/H.263/H.264/H.265 Video, stream number = 0
            timestamp_from_rtmp_to_mpegts(payload.timestamp + payload.compositionTimeOffset),
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            true,
          ),
        ));
        break;
      case 'Audio':
        if (this.audioSpecificConfig == null) { return; }
        this.manifest.feed(this.aacPacketizer.packetize(
          write_pes(
            write_adts_aac(payload.data, this.audioSpecificConfig),
            0xc0, // 0b111XXXXX: AAC Audio, stream number = 0
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            null,
            false,
          ),
        ));
        break;
    }
  }
}

HLSをHTTPで配信する

では、先ほど作ったパッケージャーをHTTPサーバにつないでみましょう。 今回はメディアプレイリストでセグメントのURLルールを決めているので、そのルールを守って配信するようにします。

上記で説明したエントリー部分のHTTP配信サーバは、以下のように実装してみました。

index.mts

import net from 'node:net';
import http from 'node:http';
import type { Duplex } from 'node:stream';
import { parseArgs } from 'node:util';
import type { ParseArgsOptionsConfig } from 'node:util';

import handle_rtmp from './rtmp-accepter.mts';

import HLSGenerator from './hls-generator.mts';

const options = {
  rtmp: {
    type: 'string',
    default: '1935',
  },
  web: {
    type: 'string',
    default: '8000',
  },
  app: {
    type: 'string',
  },
  streamKey: {
    type: 'string',
  },
  bandwidth: {
    type: 'string',
  },
} as const satisfies ParseArgsOptionsConfig;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.rtmp, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (Number.isNaN(Number.parseInt(args.web, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (args.app == null) {
  console.error('Please Specify valid app'); process.exit(1);
}
if (args.streamKey == null) {
  console.error('Please Specify valid streamKey'); process.exit(1);
}
if (args.bandwidth != null && Number.isNaN(Number.parseInt(args.bandwidth, 10))) {
  console.error('Please Specify valid bandwidth'); process.exit(1);
}
const port = Number.parseInt(args.rtmp, 10);
const web = Number.parseInt(args.web, 10);
const app = args.app;
const streamKey = args.streamKey;
const bandwidth = args.bandwidth != null ? Number.parseInt(args.bandwidth, 10) : undefined;

let rtmp_to_hls: HLSGenerator | null = null;
const handle = async (connection: Duplex) => {
  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      if (rtmp_to_hls == null) { rtmp_to_hls = new HLSGenerator(3); }
      rtmp_to_hls.feed(message);
    }
  } catch (e) {
    console.error(e);
  } finally {
    rtmp_to_hls = null;
  }
};

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

const web_server = http.createServer(async (req, res) => {
  const notfound = () => {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
  };

  if (req.url == null) {
    notfound();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname.startsWith(`/${app}/${streamKey}/`))) {
    notfound();
    return;
  }
  if (rtmp_to_hls == null) {
    notfound();
    return;
  }

  const prefix = url.pathname.slice(`/${app}/${streamKey}/`.length);
  if (prefix === 'playlist.m3u8') {
    const published = await rtmp_to_hls.published();
    if (!published) {
      notfound();
      return;
    }

    res.writeHead(200, {
      'content-type': 'application/vnd.apple.mpegurl',
      'access-control-allow-origin': '*',
    });
    res.write(rtmp_to_hls.m3u8());
    res.end();
    return;
  }
  if (prefix.endsWith('.ts') && !Number.isNaN(Number.parseInt(prefix.slice(0, -3), 10))) {
    const index = Number.parseInt(prefix.slice(0, -3), 10);

    rtmp_to_hls.segment(index, res, (found: boolean) => {
      if (!found) {
        res.writeHead(404, {
          'access-control-allow-origin': '*',
        });
        res.end();
      } else {
        res.writeHead(200, {
          'content-type': 'video/mp2t',
          'access-control-allow-origin': '*',
        });
      }
    });
    return;
  }

  notfound();
});
web_server.listen({ port: web });

視聴してみる

HLSはSafariやQuickTimeで視聴でき、ブラウザでもMSEを用いて再生するプレイヤーが数多くあります。 もちろんffplayやVLCでもエンドポイントを指定して視聴することができます。

今回はhttp://localhost:${webサーバのポート}/${app}/${streamKey}/playlist.m3u8でHLSを配信しているため、 RTMPで打ち上げた後に、プレイヤーの再生エンドポイントとして上記のURLを指定すれば再生できます。

まとめ

RTMPからHLS配信まで一気通貫で実装して理解するという、大きな目標に無事に到達しましたが、いかがでしたでしょうか?

HLSという、数多く使われているデファクトスタンダードの配信プロトコルでの配信が、とても身近で実感を持って感じられたかと思います。 今までの連載のコードだけで部品立ては整っていますので、これで実装できるんだなと思ってもらえれば幸いです。 「簡単だな、自分にもできるな」 と思った方は、ぜひ実装してOSSとして公開しましょう。待ってます!

おまけ

今回もブログ記事で紹介したコード片を1ファイルにまとめてみました。 大体1800行くらいで、それなりの量になりますね。 このTypeScriptのコードをファイルに貼り付ければ、Node24ならNode.jsだけで動きますので、是非試してみてください。

1ファイルまとめ版 (クリックで展開)

bundled.mts

import net from "node:net";
import http from "node:http"
import { randomBytes } from 'node:crypto'
import { Duplex, Writable } from "node:stream";
import { parseArgs } from "node:util";
import type { ParseArgsOptionsConfig } from "node:util"

type AsyncByteReaderOption = {
  signal: AbortSignal;
}

class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private eof = false;
  private promises: [number, (result: Buffer) => void, (error: Error) => void][] = [];

  public constructor(option?: Partial<AsyncByteReaderOption>) {
    option?.signal?.addEventListener('abort', this.feedEOF.bind(this), { once: true });
  }

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }

    if (!this.eof) { return; }
    while (this.promises.length > 0) {
      const [,,reject] = this.promises[0];
      reject(new Error('EOF Exception'));
      this.promises.shift();
    }
  }

  public feed(buffer: Buffer): void {
    if (this.eof) { return; }
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public feedEOF(): void {
    this.eof = true;
    this.fulfill();
  }

  public read(size: number): Promise<Buffer> {
    const { promise, resolve, reject } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve, reject]);
    this.fulfill();
    return promise;
  }

  public async readU8(): Promise<number> {
    const buffer = await this.read(1);
    return buffer.readUIntBE(0, 1);
  }

  public async readU16BE(): Promise<number> {
    const buffer = await this.read(2);
    return buffer.readUIntBE(0, 2);
  }

  public async readU24BE(): Promise<number> {
    const buffer = await this.read(3);
    return buffer.readUIntBE(0, 3);
  }

  public async readU32BE(): Promise<number> {
    const buffer = await this.read(4);
    return buffer.readUIntBE(0, 4);
  }

  public async readU16LE(): Promise<number> {
    const buffer = await this.read(2);
    return buffer.readUIntLE(0, 2);
  }

  public async readU24LE(): Promise<number> {
    const buffer = await this.read(3);
    return buffer.readUIntLE(0, 3);
  }

  public async readU32LE(): Promise<number> {
    const buffer = await this.read(4);
    return buffer.readUIntLE(0, 4);
  }
}

class ByteReader {
  private buffer: Buffer;
  private offset = 0;

  public constructor(buffer: Buffer) {
    this.buffer = buffer;
  }

  public isEOF(): boolean {
    return this.offset >= this.buffer.byteLength;
  }

  public read(length: number): Buffer {
    if (length < 0) { length = this.buffer.byteLength - this.offset; }
    const value = this.buffer.subarray(this.offset, this.offset + length);
    this.offset += length;
    return value;
  }

  public readUIntBE(length: number): number {
    return this.read(length).readUIntBE(0, length);
  }

  public readIntBE(length: number): number {
    return this.read(length).readIntBE(0, length);
  }

  public readU8(): number {
    return this.readUIntBE(1);
  }

  public readU16BE(): number {
    return this.readUIntBE(2);
  }

  public readU24BE(): number {
    return this.readUIntBE(3);
  }

  public readU32BE(): number {
    return this.readUIntBE(4);
  }

  public readI8(): number {
    return this.readIntBE(1);
  }

  public readI16BE(): number {
    return this.readIntBE(2);
  }

  public readI24BE(): number {
    return this.readIntBE(3);
  }

  public readI32BE(): number {
    return this.readIntBE(4);
  }

  public readF64BE(): number {
    return this.read(8).readDoubleBE(0);
  }
}

class BitReader {
  private bits: number[];
  private reader: ByteReader;

  public constructor(data: Buffer) {
    this.bits = [];
    this.reader = new ByteReader(data);
  }

  public isEOF(): boolean {
    return this.reader.isEOF() && this.bits.length === 0;
  }

  private fill(): void {
    const byte = this.reader.readU8();
    for (let i = 7; i >= 0; i--) {
      this.bits.push((byte >> i) & 1);
    }
  }

  private shift(): number {
    if (this.isEOF()) { throw new Error('EOF Exception'); }
    if (this.bits.length === 0) { this.fill(); }
    return this.bits.shift()!;
  }

  public skipBits(length: number): void {
    while (length > 0) {
      this.shift();
      length -= 1;
    }
  }

  public readBits(length: number): number {
    let bits = 0;
    while (length > 0) {
      bits = bits * 2 + this.shift();
      length -= 1;
    }
    return bits;
  }

  public readBool(): boolean {
    return this.readBits(1) === 1;
  }
}

class BitBuilder {
  private bits: number[] = [];
  private data: number[] = [];

  private fill(): void {
    while (this.bits.length >= 8) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + this.bits.shift()!;
      }
      this.data.push(datum);
    }
  }

  public writeBits(value: number, length: number): void {
    for (let i = length - 1; i >= 0; i--) {
      this.bits.push(Math.floor((value % (2 ** (i + 1))) / (2 ** i)));
    }
    this.fill();
  }

  public writeByte(value: number): void {
    this.writeBits(value, 8);
  }

  public writeBuffer(value: Buffer): void {
    for (const byte of value) { this.writeByte(byte); }
  }

  public build(): Buffer {
    const values = [... this.data];

    if (this.bits.length > 0) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + (this.bits[i] ?? 0);
      }
      values.push(datum);
    }

    return Buffer.from(values);
  }
}

type AVCDecoderConfigurationRecord = {
  configurationVersion: number;
  AVCProfileIndication: number;
  profile_compatibility: number;
  AVCLevelIndication: number;
  lengthSize: number;
  SequenceParameterSets: Buffer[];
  PictureParameterSets: Buffer[];
}

const read_avc_decoder_configuration_record = (avcDecoderConfigurationRecord: Buffer): AVCDecoderConfigurationRecord => {
  const reader = new ByteReader(avcDecoderConfigurationRecord);

  const configurationVersion = reader.readU8();
  const AVCProfileIndication = reader.readU8();
  const profile_compatibility = reader.readU8();
  const AVCLevelIndication = reader.readU8();
  const lengthSize = (reader.readU8() & 0b00000011) + 1;
  const numOfSequenceParameterSets = reader.readU8() & 0b00011111;
  const SequenceParameterSets = Array.from({ length: numOfSequenceParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  const numOfPictureParameterSets = reader.readU8();
  const PictureParameterSets = Array.from({ length: numOfPictureParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  return {
    configurationVersion,
    AVCProfileIndication,
    profile_compatibility,
    AVCLevelIndication,
    lengthSize,
    SequenceParameterSets,
    PictureParameterSets
  };
}

const write_annexb_avc = (sizedNalus: Buffer, avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord): Buffer => {
  const reader = new ByteReader(sizedNalus);

  const annexb: Buffer[] = [];
  const startcode = Buffer.from([0x00, 0x00, 0x00, 0x01]);
  let hasAUD = false;
  let hasIDR = false;
  while (!reader.isEOF()) {
    const length = reader.readUIntBE(avcDecoderConfigurationRecord.lengthSize);
    const nalu = reader.read(length);
    const naluType = nalu.readUInt8(0) & 0x1F;

    switch (naluType) {
      case 0x09: hasAUD = true; break;
      case 0x05: { // IDR
        if (hasIDR) { break; }
        for (const sps of avcDecoderConfigurationRecord.SequenceParameterSets) {
          annexb.push(startcode, sps);
        }
        for (const pps of avcDecoderConfigurationRecord.PictureParameterSets) {
          annexb.push(startcode, pps);
        }
        hasIDR = true;
        break;
      }
    }
    annexb.push(startcode, nalu);
  }
  const aud = hasAUD ? [] : [startcode, Buffer.from([0x09, 0xF0])];
  return Buffer.concat([... aud, ... annexb]);
}

const samplingFrequencyTable = [
  96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050,
  16000, 12000, 11025, 8000, 7350,
] as const;

type AudioSpecificConfig = {
  audioObjectType: number;
  samplingFrequencyIndex: number;
  samplingFrequency: number;
  channelConfiguration: number;
}

const read_audio_specific_config = (audioSpecificConfig: Buffer): AudioSpecificConfig => {
  const reader = new BitReader(audioSpecificConfig);

  let audioObjectType = reader.readBits(5);
  if (audioObjectType === 31) { audioObjectType = 32 + reader.readBits(6); }

  const samplingFrequencyIndex = reader.readBits(4);
  const samplingFrequency = samplingFrequencyIndex === 0x0f ? reader.readBits(24) : samplingFrequencyTable[samplingFrequencyIndex];
  const channelConfiguration = reader.readBits(4);

  return {
    audioObjectType,
    samplingFrequencyIndex,
    samplingFrequency,
    channelConfiguration
  };
}

const write_adts_aac = (data: Buffer, audioSpecificConfig: AudioSpecificConfig): Buffer => {
  const builder = new BitBuilder();

  // TODO: AAC-LC or Main で samplingFrequencyIndex !== 0x0F なものに絞るべき
  const { audioObjectType, samplingFrequencyIndex, channelConfiguration } = audioSpecificConfig;
  const frameLength = 7 + data.byteLength;

  builder.writeBits(0xFFF, 12); // syncword
  builder.writeBits(1, 1); // mpeg_version
  builder.writeBits(0, 2); // layer
  builder.writeBits(1, 1); // protection_absent (protected = 0)
  builder.writeBits(audioObjectType - 1, 2); // profile
  builder.writeBits(samplingFrequencyIndex, 4); // sampling_frequency_index
  builder.writeBits(0, 1); // private_bit
  builder.writeBits(channelConfiguration, 3); // channel_configuration
  builder.writeBits(0, 1); // original/copy
  builder.writeBits(0, 1); // home
  builder.writeBits(0, 1); // copyright_identification_bit
  builder.writeBits(0, 1); // copyright_identification_start
  builder.writeBits(frameLength, 13); // frame_length
  builder.writeBits(0x7FF, 11); // adts_buffer_fullness
  builder.writeBits(0, 2); // number_of_raw_data_blocks_in_frame

  return Buffer.concat([builder.build(), data]);
}

const PACKET_SIZE = 188;
const HEADER_SIZE = 4;
const PAYLOAD_SIZE = PACKET_SIZE - HEADER_SIZE;
const SYNC_BYTE = 0x47;
const STUFFING_BYTE = 0xFF;

const pes_not_contain_flags = new Set([0xBC, 0xBE, 0xBF, 0xF0, 0xF1, 0xFF, 0xF2, 0xF8]);
const pes_has_flags = (stream_id: number) => {
  return !pes_not_contain_flags.has(stream_id);
}

const crc32_table = new Uint32Array(256);
for (let i = 0; i < crc32_table.length; i++) {
  let c = i << 24;
  for (let j = 0; j < 8; j++) {
    c = ((c << 1) ^ ((c & 0x80000000) ? 0x04c11db7 : 0)) >>> 0;
  }
  crc32_table[i] = c;
}
const crc32 = (data: Buffer) => {
  let crc = 0xFFFFFFFF;
  for (const datum of data) {
    crc = ((crc << 8) ^ crc32_table[((crc >>> 24) ^ datum) & 0xFF]) >>> 0;
  }
  return crc;
}

type PAT = {
  transport_stream_id: number;
  programs: {
    program_number: number;
    program_map_PID: number;
  }[];
};

const write_pat = (pat: PAT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x00, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pat.transport_stream_id, 16); // transport_stream_id
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  for (const { program_number, program_map_PID } of pat.programs) {
    builder.writeBits(program_number, 16); // program_number
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(program_map_PID, 13); // program_map_PID
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
}

const StreamType = {
  PRIVATE_DATA: 0x06,
  AAC_ADTS: 0x0f,
  AAC_LATM: 0x11,
  AVC: 0x1b,
  METADATA: 0x15,
  HEVC: 0x24,
};

type PMT = {
  program_number: number;
  pcr_pid: number;
  program_info?: Buffer;
  streams: {
    stream_type: number;
    elementary_PID: number;
    ES_info?: Buffer;
  }[];
};

const write_pmt = (pmt: PMT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x02, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pmt.program_number, 16); // program_number
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  builder.writeBits(0b111, 3); // reserved
  builder.writeBits(pmt.pcr_pid, 13); // PCR_PID
  builder.writeBits(0b1111, 4); // reserved
  builder.writeBits(pmt.program_info?.byteLength ?? 0, 12); // program_info_length
  builder.writeBuffer(pmt.program_info ?? Buffer.from([])); // program_info
  for (const { stream_type, elementary_PID, ES_info } of pmt.streams) {
    builder.writeBits(stream_type, 8); // stream_type
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(elementary_PID, 13); // elementary_PID
    builder.writeBits(0b1111, 4); // reserved
    builder.writeBits(ES_info?.byteLength ?? 0, 12); // ES_info_length
    builder.writeBuffer(ES_info ?? Buffer.from([])); // ES_info
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
};

const write_pes = (data: Buffer, stream_id: number, pts: number | null, dts: number | null, omit_length = false) => {
  if (pts == null) { dts = null; }

  const builder = new BitBuilder();
  const PES_header_data_length = (pts != null ? 5 : 0) + (dts != null ? 5 : 0);

  builder.writeBits(1, 24); // start_code_prefix
  builder.writeBits(stream_id, 8); // stream_id
  builder.writeBits(0, 16); // pes_length
  if (pes_has_flags(stream_id)) {
    builder.writeBits(0b10000000, 8);
    builder.writeBits(pts != null ? 0b1 : 0b0, 1); // pts present
    builder.writeBits(dts != null ? 0b1 : 0b0, 1); // dts present
    builder.writeBits(0b000000, 6);
    builder.writeBits(PES_header_data_length, 8); // PES_header_data_length
    if (pts != null) {
      const pts_binary = Buffer.alloc(5)
      pts_binary.writeUIntBE(pts, 0, 5);
      const reader = new BitReader(pts_binary);
      reader.skipBits(7);

      builder.writeBits(dts != null ? 0b0011 : 0b0010, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
    if (dts != null) {
      const dts_binary = Buffer.alloc(5)
      dts_binary.writeUIntBE(dts, 0, 5);
      const reader = new BitReader(dts_binary);
      reader.skipBits(7);

      builder.writeBits(0b0001, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
  }
  const header = builder.build();
  const pes = Buffer.concat([header, data]);
  if (!omit_length) { pes.writeUInt16BE(pes.byteLength - 6, 4); }
  return pes
}

class SectionPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(section: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < section.byteLength; i += (PAYLOAD_SIZE - (i === 0 ? 1 : 0))) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + (PAYLOAD_SIZE - (i === 0 ? 1 : 0)), section.byteLength) - i;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continiuty_counter
      if (i === 0) { builder.writeBits(0, 8); } // pointer_field

      const header = builder.build();
      header.copy(packet, 0);
      section.copy(packet, header.byteLength, i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

class PESPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pes: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < pes.byteLength; i += PAYLOAD_SIZE) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + PAYLOAD_SIZE, pes.byteLength) - i;
      const filler = PAYLOAD_SIZE - length;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(filler > 0 ? 1 : 0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continuity_counter
      if (filler > 0) { builder.writeBits(filler - 1, 8); }
      if (filler > 1) { builder.writeBits(0, 8); }

      const header = builder.build();
      header.copy(packet, 0);
      pes.copy(packet, header.byteLength + Math.max(filler - 2, 0), i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

class PCRPacketizer {
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pcr: number): Buffer {
    const packet = Buffer.alloc(PACKET_SIZE, STUFFING_BYTE);

    const builder = new BitBuilder();
    builder.writeBits(SYNC_BYTE, 8);
    builder.writeBits(0, 1); // transport_err_indicator
    builder.writeBits(0, 1); // payload_unit_start_indicator
    builder.writeBits(0, 1); // transport_priority
    builder.writeBits(this.pid, 13);
    builder.writeBits(0, 2); // transport_scrambling_control
    builder.writeBits(1, 1); // adaptation_field_control (adaptation_field_present)
    builder.writeBits(0, 1); // adaptation_field_control (payload present)
    builder.writeBits(0, 4); // continuity_counter
    builder.writeBits(PACKET_SIZE - HEADER_SIZE - 1, 8); // adaptation field length
    builder.writeBits(0x10, 8); // PCR is Present
    builder.writeBits(pcr, 33); // PCR base
    builder.writeBits(0, 6); // reserved
    builder.writeBits(0, 9); // PCR extension

    builder.build().copy(packet);
    return packet;
  }
}

const UTF8Decoder = new TextDecoder('utf-8');
const scriptend: unique symbol = Symbol();

const read_string = (reader: ByteReader): string => {
  const length = reader.readU16BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_longstring = (reader: ByteReader): string => {
  const length = reader.readU32BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_object = (reader: ByteReader): Record<string, any> => {
  const object: Record<string, any> = {};
  while (true) {
    const name = read_string(reader);
    const val = read_value(reader);
    if (val === scriptend) { return object; }
    object[name] = val;
  }
}

const read_mixedarray = (reader: ByteReader): Record<string, any>  => {
  reader.readU32BE(); // length
  return read_object(reader);
}

const read_strictarray = (reader: ByteReader): any[] => {
  const length = reader.readU32BE();
  const array = [];
  for (let i = 0; i < length; i++) {
    array.push(read_value(reader));
  }
  return array;
}

const read_date = (reader: ByteReader): Date => {
  const timestamp = reader.readF64BE();
  const localtimeoffset = reader.readI16BE();
  return new Date(timestamp);
}

const read_value = (reader: ByteReader): any => {
  const tag = reader.readU8();
  switch (tag) {
    case 0: return reader.readF64BE();
    case 1: return reader.readU8() !== 0;
    case 2: return read_string(reader);
    case 3: return read_object(reader);
    case 4: throw new Error('Unsupported Tag: 4 (movie clip)');
    case 5: return null;
    case 6: return undefined;
    case 7: throw new Error('Unsupported Tag: 7 (reference)');
    case 8: return read_mixedarray(reader);
    case 9: return scriptend;
    case 10: return read_strictarray(reader);
    case 11: return read_date(reader);
    case 12: return read_longstring(reader);
  }
  throw new Error(`Invalid tag: ${tag}`);
}

const read_amf0 = (data: Buffer): any[] => {
  const reader = new ByteReader(data);
  const result = [];
  while (!reader.isEOF()) {
    result.push(read_value(reader));
  }
  return result;
}

const UTF8Encoder = new TextEncoder();

const write_number = (data: number): Buffer => {
  const buffer = Buffer.alloc(8)
  buffer.writeDoubleBE(data, 0);
  return buffer;
}

const write_string = (data: string): Buffer => {
  const utf8 = Buffer.from(UTF8Encoder.encode(data));
  const length = Buffer.alloc(2);
  length.writeUInt16BE(utf8.byteLength);
  return Buffer.concat([length, utf8]);
}

const write_object = (obj: Record<string, any>): Buffer => {
  return Buffer.concat([... Object.entries(obj).map(([k, v]) => Buffer.concat([write_string(k), write_value(v)])), Buffer.from([0x00, 0x00, 0x09])]);
}

const write_array = (data: any[]): Buffer => {
  const length = Buffer.alloc(4);
  length.writeUInt32BE(data.length);
  return Buffer.concat([length, ... data.map((datum) => write_value(datum))]);
}

const write_date = (data: Date): Buffer => {
  const buffer = Buffer.alloc(10);
  buffer.writeDoubleBE(data.getTime(), 0);
  buffer.writeUInt16BE(0, 8);
  return buffer;
}

const write_value = (data: any): Buffer => {
  if (data === null) { return Buffer.from([0x05]); }
  if (data === undefined) { return Buffer.from([0x06]); }
  if (Array.isArray(data)) { return Buffer.concat([Buffer.from([0x0a]), write_array(data)]); }
  if (data instanceof Date) { return Buffer.concat([Buffer.from([0x0b]), write_date(data)]); }
  switch (typeof data) {
    case 'number': return Buffer.concat([Buffer.from([0x00]), write_number(data)]);
    case 'boolean': return Buffer.from([0x01, data ? 1 : 0]);
    case 'string': return  Buffer.concat([Buffer.from([0x02]), write_string(data)]);
    case 'object': return Buffer.concat([Buffer.from([0x03]), write_object(data)]);
    default: return Buffer.from([]);
  }
}

const write_amf0 = (... data: any[]): Buffer => {
  return Buffer.concat(data.map((datum) => write_value(datum)));
}

type MessageInformation = {
  message_type_id: number;
  message_stream_id: number
  message_length: number;
  timestamp: number;
  timestamp_delta: number | null;
  is_extended_timestamp: boolean;
}

const MessageType = {
  SetChunkSize: 1,
  Abort: 2,
  Acknowledgement: 3,
  UserControl: 4,
  WindowAcknowledgementSize: 5,
  SetPeerBandwidth: 6,
  Audio: 8,
  Video: 9,
  DataAMF3: 15,
  CommandAMF3: 17,
  DataAMF0: 18,
  CommandAMF0: 20,
} as const;

type Message = Omit<MessageInformation, 'timestamp_delta' | 'is_extended_timestamp'> & {
  data: Buffer;
}

async function* read_message(reader: AsyncByteReader): AsyncIterable<Message> {
  let chunk_maximum_size = 128; // システムメッセージにより変化する
  const informations = new Map<number, MessageInformation>();
  const chunks = new Map<number, Buffer[]>();
  const lengths = new Map<number, number>();

  while (true) {
    const basic = await reader.readU8();
    const fmt = (basic & 0b11000000) >> 6;
    let cs_id = (basic & 0b00111111) >> 0;
    switch (cs_id) {
      case 0: cs_id = 64 + await reader.readU8(); break;
      case 1: cs_id = 64 + await reader.readU16LE(); break;
    }
    if (fmt !== 3) { chunks.set(cs_id, []); lengths.set(cs_id, 0); }
    const chunk_buffers = chunks.get(cs_id) ?? [];
    const chunk_length = lengths.get(cs_id) ?? 0;
    let information = informations.get(cs_id);

    let timestamp = fmt !== 3 ? await reader.readU24BE() : information?.timestamp;
    const message_length = fmt === 0 || fmt === 1 ? await reader.readU24BE() : information?.message_length;
    const message_type_id = fmt === 0 || fmt === 1 ? await reader.readU8() : information?.message_type_id;
    const message_stream_id = fmt === 0 ? await reader.readU32LE() : information?.message_stream_id;
    // 情報がない時は何もできないしパースもできない。今回は簡単のため受信をやめて return する
    const is_extended_timestamp = fmt !== 3 ? timestamp === 0xFFFFFF : information?.is_extended_timestamp;
    if (timestamp == null || message_length == null || message_type_id == null || message_stream_id == null || is_extended_timestamp == null) { return; }
    const extended_timestamp = is_extended_timestamp ? await reader.readU32BE() : null;

    let timestamp_delta = null;
    if (fmt === 1 || fmt === 2) {
      if (information?.timestamp == null) { return; }
      timestamp_delta = extended_timestamp ?? timestamp;
      timestamp = information.timestamp + timestamp_delta;
    } else if (fmt === 3) {
      timestamp_delta = extended_timestamp ?? information?.timestamp_delta ?? timestamp;
      timestamp += (chunk_length > 0) ? 0 : timestamp_delta;
    } else {
      timestamp = extended_timestamp ?? timestamp;
    }

    information = { message_type_id, message_stream_id, message_length, timestamp, timestamp_delta, is_extended_timestamp } satisfies MessageInformation;
    const chunk = await reader.read(Math.min(message_length - chunk_length, chunk_maximum_size));
    const length = chunk_length + chunk.byteLength;
    if (length >= message_length) {
      if (length === message_length) {
        const data = Buffer.concat([... chunk_buffers, chunk])
        if (message_type_id === MessageType.SetChunkSize) {
          chunk_maximum_size = Math.max(1, Math.min(2 ** 31 - 1, data.readUInt32BE(0)));
        } else {
          const { timestamp_delta, is_extended_timestamp, ... message } = information;
          yield { ... message, data };
        }
      }
      chunks.set(cs_id, []);
      lengths.set(cs_id, 0);
    } else {
      chunks.set(cs_id, [... chunk_buffers, chunk]);
      lengths.set(cs_id, length);
    }
    informations.set(cs_id, information);
  }
}

const write_message = (message: Omit<Message, 'message_length'>): Buffer => {
  let chunk_maximum_size = 128;
  const result: Buffer[] = [];

  for (let i = 0; i < message.data.byteLength; i += chunk_maximum_size) {
    const chunk = message.data.subarray(i, Math.min(message.data.byteLength, i + chunk_maximum_size));
    const cs_id = 3; // こちらからはシステム系のメッセージしか送らないので 3 に固定する
    const fmt = i === 0 ? 0 : 3; // 簡単化のため 0 か 3 以外は使わない
    const basic = Buffer.from([(fmt << 6) | cs_id]);

    if (fmt === 3) {
      result.push(basic, chunk);
      continue;
    }

    const header = Buffer.alloc(message.timestamp >= 0xFFFFFF ? 15 : 11);
    header.writeUIntBE(Math.min(message.timestamp, 0xFFFFFF), 0, 3);
    header.writeUIntBE(message.data.byteLength, 3, 3);
    header.writeUIntBE(message.message_type_id, 6, 1);
    header.writeUIntLE(message.message_stream_id, 7, 4);
    if (message.timestamp >= 0xFFFFFF) {
      header.writeUIntBE(message.timestamp, 11, 4);
    }

    result.push(basic, header, chunk);
  }

  return Buffer.concat(result);
}

class BandwidthQuotaExceededError extends Error {
  constructor(message: string, option?: ErrorOptions) {
    super(message, option);
    this.name = this.constructor.name;
  }
}

type BandwidthEstimatorOption = {
  intervalMills: number;
  movingAverageLength: number;
}
const BandwidthEstimatorOption = {
  from(option?: Partial<BandwidthEstimatorOption>): BandwidthEstimatorOption {
    return { intervalMills: 100, movingAverageLength: 10, ... option };
  }
}

class BandwidthEstimator {
  private option: BandwidthEstimatorOption;
  private controller: AbortController;
  private limit: number;
  private timerId: NodeJS.Timeout | null = null;
  private readonly measureHandler = this.measure.bind(this);
  private estimates: number[] = [];
  private movingAverageLength: number;
  private totalBytes: number = 0;
  private previousTime: DOMHighResTimeStamp = performance.now();

  public constructor(limit: number, controller: AbortController, option?: Partial<BandwidthEstimatorOption>) {
    this.limit = limit;
    this.controller = controller;
    this.option = BandwidthEstimatorOption.from(option);
    this.movingAverageLength = this.option.movingAverageLength;
    this.timerId = setInterval(this.measureHandler, this.option.intervalMills);
  }

  public feed(byteLenght: number) {
    this.totalBytes += byteLenght;
  }

  public estimate(): number {
    return this.estimates.reduce((sum, curr) => sum + curr, 0) / this.movingAverageLength;
  }

  public measure(): void {
    const currentTime = performance.now();
    const duration = (currentTime - this.previousTime) / 1000;
    const bandwidth = (this.totalBytes * 8) / duration;
    this.estimates.push(bandwidth);

    if (this.estimates.length >= this.movingAverageLength) {
      this.estimates.splice(0, this.estimates.length - this.movingAverageLength);
      const average = this.estimate();
      if (average >= this.limit) {
        this.controller.abort(new BandwidthQuotaExceededError(`Bandwidth quota exceeded! limit: ${this.limit}, actual: ${Math.ceil(average)}`));
      }
    }

    this.previousTime = currentTime;
    this.totalBytes = 0;
  }

  public destroy(): void {
    if (this.timerId == null) { return; }
    clearInterval(this.timerId);
    this.timerId = null;
  }
}

const options = {
  rtmp: {
    type: 'string',
    default: '1935',
  },
  web: {
    type: 'string',
    default: '8000',
  },
  app: {
    type: 'string'
  },
  streamKey: {
    type: 'string'
  },
  highWaterMark: {
    type: 'string'
  },
  bandwidth: {
    type: 'string'
  }
} as const satisfies ParseArgsOptionsConfig;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.rtmp, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (Number.isNaN(Number.parseInt(args.web, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (args.app == null) {
  console.error('Please Specify valid app'); process.exit(1);
}
if (args.streamKey == null) {
  console.error('Please Specify valid streamKey'); process.exit(1);
}
if (args.bandwidth != null && Number.isNaN(Number.parseInt(args.bandwidth, 10))) {
  console.error('Please Specify valid bandwidth'); process.exit(1);
}
const port = Number.parseInt(args.rtmp, 10);
const web = Number.parseInt(args.web, 10);
const app = args.app;
const streamKey = args.streamKey;
const bandwidth = args.bandwidth != null ? Number.parseInt(args.bandwidth, 10) : undefined;

const handle_handshake = async (reader: AsyncByteReader, connection: Duplex): Promise<boolean> => {
  // C0/S0
  await reader.readU8(); // Read C0
  connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
  // C1/S1
  const c1 = await reader.read(1536); // read C1
  const c1_random = c1.subarray(8);
  const s1_random = randomBytes(1536 - 8);
  const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
  connection.write(s1); // write C1
  // C2/S2
  const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
  connection.write(s2); // write S2
  const c2 = await reader.read(1536); // read C2
  const c2_random_echo = c2.subarray(8);
  // Check Random Echo
  return s1_random.equals(c2_random_echo);
}

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
  DISCONNECTED: 'DISCONNECTED',
} as const;
type Option = {
  id: Symbol;
  app: string;
  streamKey: string;
};

const MESSAGE_STREAM = 1;
let lock: Symbol | null = null;
const need_yield = (state: (typeof STATE)[keyof typeof STATE], message: Message): boolean => {
  if (state !== STATE.PUBLISHED) { return false; }
  if (message.message_stream_id !== MESSAGE_STREAM) { return false; }

  switch (message.message_type_id) {
    case MessageType.Audio: return true;
    case MessageType.Video: return true;
    case MessageType.DataAMF0: return true;
    default: return false;
  }
};
const TRANSITION = {
  [STATE.WAITING_CONNECT]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.WAITING_CONNECT; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_CONNECT; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'connect') { return STATE.WAITING_CONNECT; }
    const transaction_id = command[1];
    const appName = command[2]['app'];
    const connectAccepted = appName === option.app;

    const info = connectAccepted ? {
      code: 'NetConnection.Connect.Success',
      description: 'Connection succeeded.',
      data: { version: '3,5,7,7009', },
      objectEncoding: 0, // 0 = AMF0, 3 = AMF3
      level: 'status', // 正常系
    } : {
      code: 'NetConnection.Connect.Rejected',
      description: 'Connection rejected.',
      level: 'error', // 異常系
    };

    const result = write_amf0(
      '_result',
      transaction_id,
      {
        fmsVer: 'FMS/3,5,7,7009',
        capabilities: 31,
        mode: 1,
      },
      info
    );
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: 0,
      timestamp: 0,
      data: result,
    }));

    if (!connectAccepted) { return STATE.DISCONNECTED; }
    return STATE.WAITING_CREATESTREAM;
  },
  [STATE.WAITING_CREATESTREAM]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.WAITING_CREATESTREAM; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_CREATESTREAM; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'createStream') { return STATE.WAITING_CREATESTREAM; }
    const transaction_id = command[1];

      // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
    const result = write_amf0('_result', transaction_id, null, MESSAGE_STREAM);
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: 0,
      timestamp: 0,
      data: result,
    }));

    return STATE.WAITING_PUBLISH;
  },
  [STATE.WAITING_PUBLISH]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== MESSAGE_STREAM) { return STATE.WAITING_PUBLISH; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_PUBLISH; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'publish') { return STATE.WAITING_PUBLISH; }
    const transaction_id = command[1];
    const streamKey = command[3];
    const publishAccepted = streamKey === option.streamKey && lock == null; // streamKey が合致していて、配信されてない場合は配信を許可する

    const info = publishAccepted ? {
      code: 'NetStream.Publish.Start',
      description: 'Publish Accepted',
      level: 'status' // 正常系
    } : {
      code: 'NetStream.Publish.Failed', // Permision Denied
      description: 'Publish Failed',
      level: 'error' // 異常系
    };

    const result = write_amf0('onStatus', transaction_id, null, info);
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: message.message_stream_id,
      timestamp: 0,
      data: result,
    }));

    if (!publishAccepted) { return STATE.DISCONNECTED; }
    lock = option.id;
    return STATE.PUBLISHED;
  },
  [STATE.PUBLISHED]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.PUBLISHED; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.PUBLISHED; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'deleteStream') { return STATE.PUBLISHED; }
    const stream = command[3];
    if (stream !== MESSAGE_STREAM) { return STATE.PUBLISHED; }

    return STATE.DISCONNECTED;
  },
  [STATE.DISCONNECTED]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    return STATE.DISCONNECTED;
  }
} as const satisfies Record<(typeof STATE)[keyof typeof STATE], (message: Message, connection: Duplex, option: Option) => (typeof STATE)[keyof typeof STATE]>;

class DisconnectError extends Error {
  constructor(message: string, option?: ErrorOptions) {
    super(message, option);
    this.name = this.constructor.name;
  }
}

async function* handle_rtmp(connection: Duplex, app: string, key: string, limit?: number): AsyncIterable<Message> {
  const option = {
    id: Symbol(),
    app: app,
    streamKey: key,
  } satisfies Option;
  const controller = new AbortController();
  const reader = new AsyncByteReader({ signal: controller.signal });
  const estimator = new BandwidthEstimator(limit ?? Number.POSITIVE_INFINITY, controller);
  connection.pipe(new Writable({
    write(data, _, cb) { reader.feed(data); estimator.feed(data.byteLength); cb(); },
  }));
  const disconnected = () => { controller.abort(new DisconnectError('Disconnected!')); }
  connection.addListener('close', disconnected);

  try {
    /*
    * RTMPのハンドシェイクを処理する
    */
    if (!await handle_handshake(reader, connection)) { return; }
    /*
    * RTMPのメッセージを処理する
    */
    let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
    for await (const message of read_message(reader)) {
      // 共通で処理するメッセージはここで処理する

      // 上位に伝える映像/音声/データのメッセージだったら伝える
      if (need_yield(state, message)) { yield message; }

      // 個別のメッセージによる状態遷移
      state = TRANSITION[state](message, connection, option);
      if (state === STATE.DISCONNECTED) { return; }
    }
  } catch (e) {
    throw e;
  } finally {
    connection.removeListener('close', disconnected);
    if (lock === option.id) { lock = null; }
    connection.destroy();
    estimator.destroy();
    reader.feedEOF();
  }
}

type VideoMessage = Message & {
  message_type_id: (typeof MessageType.Video);
};
type AudioMessage = Message & {
  message_type_id: (typeof MessageType.Audio);
}
type AMF0DataMessage = Message & {
  message_type_id: (typeof MessageType.DataAMF0);
}

const FrameType = {
  KEY_FRAME: 1,
  INTER_FRAME: 2,
  DISPOSABLE_INTER_FRAME: 3,
  GENERATED_KEY_FRAME: 4,
  VIDEO_INFO_OR_COMMAND: 5,
} as const;

const VideoCodecType = {
  AVC: 7,
} as const;

const AudioCodecType = {
  AAC: 10,
} as const;

type AVCData = {
  codec: 'AVC',
} & ({
  packetType: 0,
  avcDecoderConfigurationRecord: Buffer;
} | {
  packetType: 1,
  compositionTimeOffset: number;
  data: Buffer;
});
type VideoData = AVCData & {
  timestamp: number;
  type: number;
  kind: 'Video';
};

type AACData = {
  codec: 'AAC',
} & ({
  packetType: 0;
  audioSpecificConfig: Buffer;
} | {
  packetType: 1;
  data: Buffer;
});
type AudioData = AACData & {
  timestamp: number;
  kind: 'Audio';
};

type ActionScriptData = {
  timestamp: number;
  kind: 'Data',
  values: any[];
}

const handle_avc = (reader: ByteReader): AVCData | null => {
  const packetType = reader.readU8();
  const compositionTimeOffset = reader.readI24BE();

  switch (packetType) {
    case 0: return {
      codec: 'AVC',
      packetType,
      avcDecoderConfigurationRecord: reader.read(-1),
    };
    case 1: return {
      codec: 'AVC',
      packetType,
      compositionTimeOffset,
      data: reader.read(-1),
    }
    default:
      return null;
  }
}

const handle_video = (message: VideoMessage): VideoData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const type = (meta & 0xF0) >> 4;
  const codec = (meta & 0x0F) >> 0;

  switch (codec) {
    case VideoCodecType.AVC: {
      const avc = handle_avc(reader);
      if (avc == null) { return null; }
      return { kind: 'Video', type, timestamp: message.timestamp, ... avc };
    }
    default:
      return null;
  }
}

const handle_aac = (reader: ByteReader): AACData | null => {
  const packetType = reader.readU8();
  switch (packetType) {
    case 0: return {
      codec: 'AAC',
      packetType,
      audioSpecificConfig: reader.read(-1)
    };
    case 1: return {
      codec: 'AAC',
      packetType,
      data: reader.read(-1),
    }
    default:
      return null;
  }
}

const handle_audio = (message: AudioMessage): AudioData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const codec = (meta & 0xF0) >> 4;
  const rate = (meta & 0x0C) >> 2;
  const size = (meta & 0x02) >> 1;
  const type = (meta & 0x01) >> 0;

  switch (codec) {
    case AudioCodecType.AAC: {
      // In AAC: rate, size, type are derived in AudioSpecificConfig
      const aac = handle_aac(reader);
      if (aac == null) { return null; }
      return { kind: 'Audio', timestamp: message.timestamp, ... aac };
    }
    default:
      return null;
  }
}

const handle_amf0_data = (message: AMF0DataMessage): ActionScriptData | null => {
  const values = read_amf0(message.data);
  return { kind: 'Data', timestamp: message.timestamp, values };
}

const handle_rtmp_payload = (message: Message): VideoData | AudioData | ActionScriptData | null => {
  switch (message.message_type_id) {
    case MessageType.Video: return handle_video(message as VideoMessage);
    case MessageType.Audio: return handle_audio(message as AudioMessage);
    case MessageType.DataAMF0: return handle_amf0_data(message as AMF0DataMessage);
    default: return null;
  }
}

class Segment {
  private beginTimestamp: number;
  private endTimestamp: number | null = null;
  private media: Buffer[] = [];

  public constructor(timestamp: number) {
    this.beginTimestamp = timestamp;
  }

  public begin(): number {
    return this.beginTimestamp;
  }

  public extinf(): number | null {
    if (this.endTimestamp == null) { return null; }
    return this.endTimestamp - this.beginTimestamp;
  }

  public write(writable: Writable): void {
    if (this.extinf() != null) {
      for (const data of this.media) {
        writable.write(data);
      }
    }

    writable.end();
    return;
  }

  public feed(data: Buffer): void {
    this.media.push(data);
  }

  public complete(timestamp: number): void {
    if (this.extinf() != null) { return; }
    this.endTimestamp = timestamp;
  }
}

const MINIMUM_LIVE_WINDOW_LENGTH = 3;

type MediaPlaylistOption = {
  liveWindowLength: number;
  orphanedWindowLength: number;
  minimumSegmentDuration: number;
};

const MediaPlaylistOption = {
  from(option?: Partial<MediaPlaylistOption>): MediaPlaylistOption {
    return {
      ... option,
      liveWindowLength: Math.max(option?.liveWindowLength ?? MINIMUM_LIVE_WINDOW_LENGTH, MINIMUM_LIVE_WINDOW_LENGTH),
      orphanedWindowLength: Math.max(option?.orphanedWindowLength ?? MINIMUM_LIVE_WINDOW_LENGTH, MINIMUM_LIVE_WINDOW_LENGTH),
      minimumSegmentDuration: Math.max(0, option?.minimumSegmentDuration ?? 0),
    };
  },
};

class MediaPlaylist {
  public published: Promise<boolean>;
  private publishedNotify: (success: boolean) => void;

  private sequenceNumber: number = -1;
  private orphanedNumber: number;

  private liveWindowLength: number;
  private orphanedWindowLength: number;

  private currentSegment: Segment | null = null;
  private segmentMap: Map<number, Segment> = new Map<number, Segment>();
  private orphanedMap: Map<number, Segment> = new Map<number, Segment>();

  private minimumSegmentDuration: number;
  private targetduration: number | null = null;

  public constructor(opt?: Partial<MediaPlaylistOption>) {
    const option = MediaPlaylistOption.from(opt);
    this.liveWindowLength = option.liveWindowLength;
    this.orphanedWindowLength = option.orphanedWindowLength;
    this.orphanedNumber = this.sequenceNumber - option.liveWindowLength;
    this.minimumSegmentDuration = option.minimumSegmentDuration;

    const { promise: published, resolve: publishedNotify } = Promise.withResolvers<boolean>();
    this.published = published;
    this.publishedNotify = publishedNotify;
  }

  public append(timestamp: number): void {
    if (this.currentSegment != null && (timestamp - this.currentSegment.begin()) < this.minimumSegmentDuration) {
      return;
    }

    this.currentSegment?.complete(timestamp);
    if (this.currentSegment) {
      this.segmentMap.set(this.sequenceNumber, this.currentSegment);
    }
    if (this.currentSegment?.extinf() != null && this.targetduration == null) {
      this.targetduration = Math.ceil(this.currentSegment.extinf()!);
    }
    if (this.sequenceNumber - this.liveWindowLength >= 0) {
      const sequenceNumber = this.sequenceNumber - this.liveWindowLength;
      const segment = this.segmentMap.get(sequenceNumber);

      if (segment) {
        this.orphanedMap.set(sequenceNumber, segment);
        this.segmentMap.delete(sequenceNumber);
      }
    }
    if (this.orphanedNumber - this.orphanedWindowLength >= 0) {
      const sequenceNumber = this.orphanedNumber - this.orphanedWindowLength;
      this.orphanedMap.delete(sequenceNumber);
    }

    this.sequenceNumber += 1;
    this.orphanedNumber += 1;
    this.currentSegment = new Segment(timestamp);

    if (this.segmentMap.size >= MINIMUM_LIVE_WINDOW_LENGTH) {
      this.publishedNotify(true);
    }
  }

  public feed(data: Iterable<Buffer>): void {
    for (const buffer of data) { this.currentSegment?.feed(buffer); }
  }

  public m3u8(): string {
    let m3u8 = '';
    m3u8 += '#EXTM3U\n';
    m3u8 += '#EXT-X-VERSION:3\n';
    m3u8 += `#EXT-X-MEDIA-SEQUENCE:${Math.max(0, this.sequenceNumber - this.liveWindowLength)}\n`;
    m3u8 += `#EXT-X-TARGETDURATION:${this.targetduration}\n`;
    m3u8 += '\n';
    for (let i = Math.max(0, this.sequenceNumber - this.liveWindowLength); i < this.sequenceNumber; i++) {
      const segment = this.segmentMap.get(i)!;
      m3u8 += `#EXTINF:${segment.extinf()!.toFixed(6)}\n`;
      m3u8 += `${i}.ts\n`;
    }

    return m3u8;
  }

  public segment(msn: number, writable: Writable, cb?: (found: boolean) => void): void {
    const segment = this.sequenceNumber === msn ? this.currentSegment : this.segmentMap.get(msn) ?? this.orphanedMap.get(msn);
    if (segment == null) {
      cb?.(false);
      return;
    }

    cb?.(true);
    segment.write(writable);
  }
}

const PMT_PID = 256;
const PCR_PID = 257;
const AVC_PID = 258;
const AAC_PID = 259;
const PAT_DATA = {
  transport_stream_id: 0,
  programs: [{
    program_number: 1,
    program_map_PID: PMT_PID,
  }],
} as const satisfies PAT;
const PMT_DATA = {
  program_number: 1,
  pcr_pid: PCR_PID,
  streams: [{
    stream_type: StreamType.AVC,
    elementary_PID: AVC_PID,
  }, {
    stream_type: StreamType.AAC_ADTS,
    elementary_PID: AAC_PID,
  }],
} as const satisfies PMT;
const emit_PSI_interval = 100;

const timestamp_from_rtmp_to_mpegts = (timestamp: number): number => {
  return timestamp * 90;
};

const timestamp_from_rtmp_to_hls = (timestamp: number): number => {
  return timestamp / 1000;
};

class HLSGenerator {
  private avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord | null = null;
  private audioSpecificConfig: AudioSpecificConfig | null = null;
  private patPacketizer = new SectionPacketizer(0);
  private pmtPacketizer = new SectionPacketizer(PMT_PID);
  private pcrPacketizer = new PCRPacketizer(PCR_PID);
  private avcPacketizer = new PESPacketizer(AVC_PID);
  private aacPacketizer = new PESPacketizer(AAC_PID);
  private last_emit_PSI_timestamp: number | null = null;

  private manifest: MediaPlaylist;

  public constructor(liveWindowLength: number = 3) {
    this.manifest = new MediaPlaylist({ liveWindowLength });
  }

  public published(): Promise<boolean> {
    return this.manifest.published;
  }

  public segment(msn: number, writable: Writable, cb?: (found: boolean) => void): void {
    return this.manifest.segment(msn, writable, cb);
  }

  public m3u8(): string {
    return this.manifest.m3u8();
  }

  public feed(message: Message): void {
    if (this.last_emit_PSI_timestamp != null && (message.timestamp - this.last_emit_PSI_timestamp) >= emit_PSI_interval) {
      this.manifest.feed([
        ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
        ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
        this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(message.timestamp)),
      ]);
      this.last_emit_PSI_timestamp = message.timestamp;
    }

    const payload = handle_rtmp_payload(message);
    if (payload == null) { return; }

    switch (payload.kind) {
      case 'Video':
        if (payload.codec !== 'AVC') { return; }
        if (payload.packetType === 0) {
          this.avcDecoderConfigurationRecord = read_avc_decoder_configuration_record(payload.avcDecoderConfigurationRecord);
          return;
        }
        if (payload.type === FrameType.KEY_FRAME) {
          this.manifest.append(timestamp_from_rtmp_to_hls(payload.timestamp));
          this.manifest.feed([
            ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
            ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
            this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(payload.timestamp)),
          ]);
          this.last_emit_PSI_timestamp = payload.timestamp;
        }
        break;
      case 'Audio':
        if (payload.codec !== 'AAC') { return; }
        if (payload.packetType === 0) {
          this.audioSpecificConfig = read_audio_specific_config(payload.audioSpecificConfig);
          return;
        }
        break;
      default:
        return;
    }

    switch (payload.kind) {
      case 'Video':
        if (this.avcDecoderConfigurationRecord == null) { return; }
        this.manifest.feed(this.avcPacketizer.packetize(
          write_pes(
            write_annexb_avc(payload.data, this.avcDecoderConfigurationRecord),
            0xe0, // 0b1110XXXX: H.262/H.263/H.264/H.265 Video, stream number = 0
            timestamp_from_rtmp_to_mpegts(payload.timestamp + payload.compositionTimeOffset),
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            true,
          ),
        ));
        break;
      case 'Audio':
        if (this.audioSpecificConfig == null) { return; }
        this.manifest.feed(this.aacPacketizer.packetize(
          write_pes(
            write_adts_aac(payload.data, this.audioSpecificConfig),
            0xc0, // 0b111XXXXX: AAC Audio, stream number = 0
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            null,
            false,
          ),
        )); // AAC は Audio Only なら true
        break;
    }
  }
}

let rtmp_to_hls: HLSGenerator | null = null;
const handle = async (connection: Duplex) => {
  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      if (rtmp_to_hls == null) { rtmp_to_hls = new HLSGenerator(3); }
      rtmp_to_hls.feed(message);
    }
  } catch (e) {
    console.error(e);
  } finally {
    rtmp_to_hls = null;
  }
};

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

const web_server = http.createServer(async (req, res) => {
  const notfound = () => {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
  };

  if (req.url == null) {
    notfound();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname.startsWith(`/${app}/${streamKey}/`))) {
    notfound();
    return;
  }
  if (rtmp_to_hls == null) {
    notfound();
    return;
  }

  const prefix = url.pathname.slice(`/${app}/${streamKey}/`.length);
  if (prefix === 'playlist.m3u8') {
    const published = await rtmp_to_hls.published();
    if (!published) {
      notfound();
      return;
    }

    res.writeHead(200, {
      'content-type': 'application/vnd.apple.mpegurl',
      'access-control-allow-origin': '*',
    });
    res.write(rtmp_to_hls.m3u8());
    res.end();
    return;
  }
  if (prefix.endsWith('.ts') && !Number.isNaN(Number.parseInt(prefix.slice(0, -3), 10))) {
    const index = Number.parseInt(prefix.slice(0, -3), 10);

    rtmp_to_hls.segment(index, res, (found: boolean) => {
      if (!found) {
        res.writeHead(404, {
          'access-control-allow-origin': '*',
        });
        res.end();
      } else {
        res.writeHead(200, {
          'content-type': 'video/mp2t',
          'access-control-allow-origin': '*',
        });
      }
    });
    return;
  }

  notfound();
});
web_server.listen({ port: web });