PLAY DEVELOPERS BLOG

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

動画配信技術を実装して理解する〜MPEG-TS トランスマックス編〜

こんにちは、PLAY CLOUD本部 技術推進室 の石川です。「動画配信技術を実装して理解する」も3回目の記事となりました。 前回の記事はこちらになります。

developers.play.jp

今回は、前回のHTTP-FLV配信のベースに対して、MPEG-TSにトランスマックスし、HTTP-TS配信を実装します。 本当はHLSにして配信するまでをまとめて書きたかったのですが、HLSで使うMPEG-TSへのトランスマックス処理だけで、非常に長くなってしまいました。 このため、FLVからMPEG-TSへのトランスマックス単体で記事にしたという次第です。

トランスマックスについての基礎知識は、以前執筆した以下の記事もご参照いただければと思います。 developers.play.jp

では、RTMPで使われているFLVからHLSで使うMPEG-TSにトランスマックスし、ストリームをHTTPで再生するまでを実際に実装して理解してみましょう。

MPEG-TSとは?

そもそも、MPEG-TS(MPEG Transport Stream)とはなんでしょうか? MPEG-TSは主に放送向けで使われるコンテナで、HLS配信でのコンテナとして見かける方も多いでしょう。

MPEG-TSの特徴としては、放送向けであるためエラー耐性があり、データの欠落時に容易に復帰が可能な点が挙げられます。 TSパケットという188byteのパケット単位でデータを多重化して伝送するため、 データが欠落しても、次のパケットから読み込みを復帰させることができ、迅速にエラーから復帰が可能です。

PID (Packet IDentifier)

MPEG-TSでTSパケットという単位で多重化する際、各パケットが何を伝達しているかを識別しなければなりません。 このため、パケットにはPID(Packet IDentifier)というものが割り当てられます。 このPIDは内部的には13bitの数値で表現されます。これにより、PID単位でストリームを分割して多重化できるというわけです。

例えば、構成するH.264やAACのストリームに対して、各々1つのPIDを割り当てます。 このサービスに対するストリームのPIDの割り当ては後述するPMTを使って定義します。 受信機側はPID単位で分離して、各々のパケットからストリームを復元します。

また、システムで定義されたPSI/SIには固定のPIDが割り当てられている場合があります。 例えば、最初のエントリーであるPATはPIDが0で固定となっています。

PCR (Program Clock Reference)

PCRはMPEG-TSの基準時刻を管理するデータになります。受信機はPCRの情報により内部クロックを管理します。 このクロック管理によって、デコーダへのデータ投入のレートを保証でき、送信側が意図した最大バッファサイズを守ることができます。 要するに、送信側が期待するデコーダの時刻モデルを、デコーダ側が再現して守るために使われています。

ただし、HLS配信においてはバッファ量はHTTPの受信速度やABRによるビットレートスイッチによって勝手に変わってしまいます。 これでは時刻モデルなんて成り立たないので、HLSではPCRの存在自体を考慮しないプレイヤーもあったりします。

Section

MPEG-TSでは、番組情報などの動画に付随するメタデータ(PSI/SI)をSectionという形で伝送します。 後述するPATやPMTなどは、このSectionというデータ構成で送られます。

PAT (Program Association Table)

PATは、プログラム(サービスとも言います)を構成するPMTの一覧を持つSectionです。 PATにはプログラム番号(サービスID)とそれを構成するPMTのPIDという2つ組のリストが格納されています。

PATはPIDが0で固定されており、エントリーの役割を果たしています。 まず、受信機はPATを探して、そこから対象のPMTのPIDを検索し構成情報を取得します。

PMT (Program Map Table)

PMTは、1つのプログラム(サービス)を構成する映像や音声といった各ストリームの詳細情報を示すSectionです。

PMTには、各ストリームのコーデック種別(映像がH.264、音声がAACなど)と、そのストリームを識別するためのPIDのペアが格納されています。 また、前述したPCRのPIDもこのPMTに記載されます。 受信機はPMTを読むことで、どのPIDのデータをどの種別のデコーダに渡せばよいかを判断し、再生の準備を行います。

PES (Packetized Elementary Stream)

基本的にコーデックは、PESという単位でMPEG-TS上で伝送されます。

PESは、PacketizeされたElementary Streamを指します。 Elementary Streamをざっくりいうと、コーデックのデータがそのまま並んでいるもの(ストリーム)と考えてください。

PESは、Elementary Streamを再生時間に基づいて分割し、PTS(Presentation Time Stamp)やDTS(Decoding Time Stamp)といった時刻情報を付与したものになります。 これにより、各データがいつデコードされ、いつ表示されるべきかを正確に制御できます。

FLV から MPEG-TS へのトランスマックスの流れ

FLVとMPEG-TSではコンテナが異なり、コーデックの伝送方法も異なります。 FLVは、ほぼRTMPでしか使われず、MPEG-TSは放送やHLSで使われているため、HLS配信を考えるとMPEG-TSにしたいところです。

まず、それぞれのコンテナでコーデックがどのように運ばれるのか箇条書きで表すと、このような形になります。

  • FLV
    • AAC: AudioSpecificConfigで初期化情報を伝送
    • AVC (H.264): AVCDecoderConfigurationRecordで初期化情報を伝送
  • MPEG-TS
    • AAC: ADTSヘッダを各フレームに付与
    • AVC (H.264): AnnexB形式(スタートコードでNALUを区切る)で伝送

なぜコーデック自体の伝送方法が違うのかというと、FLVはファイルベースのフォーマットであり、 同じ初期化情報を初回だけ伝送するようにしてファイルサイズを減らすという前提があるのに対して、 MPEG-TSはストリーム処理をベースとしているため、いつ読んでも読み取れるように周期的に初期化するという前提があるためです。

ですので、FLVからMPEG-TSへのトランスマックスは、主に以下の処理を行います。

  1. FLVから映像・音声のヘッダーを解析する
  2. AACのAudioSpecificConfigをADTSヘッダに変換する。
  3. AVC(H.264)のAVCDecoderConfigurationRecordをAnnexB形式に変換する。
  4. PAT、PMTといったSectionと、映像・音声データを入れるPESを生成する。
  5. 上記で生成したSecitonとPESを188バイトのTSパケットに分割(パケタイズ)する。

これらの処理を行うことで、FLVからMPEG-TSへトランスマックスできます。 図として表すと、以下のような処理フローになります。

FLVからMPEG-TSへのトランスマックスの全体図

なんか難しそうですが、一つ一つ追っていきましょう。

ユーティリティの定義

まず、MPEG-TSへのトランスマックス処理を楽にするために、bit列を操作するユーティリティを作成します。 今回は読み書き両方とも必要なので、BitBuilderとBitReaderというユーティリティを定義します。

なぜ、このようなクラスを作るのかというと、トランスマックス処理ではバイト列をbit単位で読み取ったり、bit単位でデータを書くことが非常に多くなります。また、MPEG-TSのヘッダ情報などは、仕様上ビット単位で細かくフィールドが定義されており、仕様に沿って厳密に読み書きすることが重要になります。

このようなbit列操作のユーティリティがあると、厳密に実装するのが容易になりますので作っておくことをおすすめします。

BitBuilder (クリックで展開)

bit-builder.mts

export default class BitBuilder {
  private bits: number[] = [];
  private data: number[] = [];

  private fill(): void {
    while (this.bits.length >= 8) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + this.bits.shift()!;
      }
      this.data.push(datum);
    }
  }

  public writeBits(value: number, length: number): void {
    for (let i = length - 1; i >= 0; i--) {
      this.bits.push(Math.floor((value % (2 ** (i + 1))) / (2 ** i)));
    }
    this.fill();
  }

  public writeByte(value: number): void {
    this.writeBits(value, 8);
  }

  public writeBuffer(value: Buffer): void {
    for (const byte of value) { this.writeByte(byte); }
  }

  public build(): Buffer {
    const values = [... this.data];

    if (this.bits.length > 0) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + (this.bits[i] ?? 0);
      }
      values.push(datum);
    }

    return Buffer.from(values);
  }
}

BitReader (クリックで展開)

bit-reader.mts

import ByteReader from './byte-reader.mts';

export default class BitReader {
  private bits: number[];
  private reader: ByteReader;

  public constructor(data: Buffer) {
    this.bits = [];
    this.reader = new ByteReader(data);
  }

  public isEOF(): boolean {
    return this.reader.isEOF() && this.bits.length === 0;
  }

  private fill(): void {
    const byte = this.reader.readU8();
    for (let i = 7; i >= 0; i--) {
      this.bits.push((byte >> i) & 1);
    }
  }

  private shift(): number {
    if (this.isEOF()) { throw new Error('EOF Exception'); }
    if (this.bits.length === 0) { this.fill(); }
    return this.bits.shift()!;
  }

  public skipBits(length: number): void {
    while (length > 0) {
      this.shift();
      length -= 1;
    }
  }

  public readBits(length: number): number {
    let bits = 0;
    while (length > 0) {
      bits = bits * 2 + this.shift();
      length -= 1;
    }
    return bits;
  }

  public readBool(): boolean {
    return this.readBits(1) === 1;
  }
}

Messageのパース処理

まず、FLVに含まれる映像・音声データを解析し、データやヘッダー情報を抜き出します。 今回は簡単のため、H.264/AACを対象としてMessageの中身をパースして処理します。 FLVの仕様通りにパースすれば、特に難しいところはありません。

FLVの仕様で決められているデータ構造は以下のようになっています。 AudioData/VideoDataの部分に、それぞれコーデックをラップしたAACAUDIODATA/AVCVIDEOPACKETが入ります。 ちなみに、AACの場合は伝送されるAudioSpecificConfigの情報を用いることが決められており、この領域のメタ情報は使いません。

FLVの映像・音声のメッセージのデータ構造

コードとして書くと、このようになります。

rtmp-handler.mts

import { MessageType } from './message-reader.mts';
import type { Message } from '.message-reader.mts';
import ByteReader from './byte-reader.mts';
import read_amf0 from './amf0-reader.mts';

type VideoMessage = Message & {
  message_type_id: (typeof MessageType.Video);
};
type AudioMessage = Message & {
  message_type_id: (typeof MessageType.Audio);
};
type AMF0DataMessage = Message & {
  message_type_id: (typeof MessageType.DataAMF0);
};

export const VideoCodecType = {
  AVC: 7,
} as const;

export const AudioCodecType = {
  AAC: 10,
} as const;

export const FrameType = {
  KEY_FRAME: 1,
  INTER_FRAME: 2,
  DISPOSABLE_INTER_FRAME: 3,
  GENERATED_KEY_FRAME: 4,
  VIDEO_INFO_OR_COMMAND: 5,
} as const;

export type AVCData = {
  codec: 'AVC',
} & ({
  packetType: 0,
  avcDecoderConfigurationRecord: Buffer;
} | {
  packetType: 1,
  compositionTimeOffset: number;
  data: Buffer;
});
export type VideoData = AVCData & {
  timestamp: number;
  type: number;
  kind: 'Video';
};

export type AACData = {
  codec: 'AAC',
} & ({
  packetType: 0;
  audioSpecificConfig: Buffer;
} | {
  packetType: 1;
  data: Buffer;
});
export type AudioData = AACData & {
  timestamp: number;
  kind: 'Audio';
};

export type ActionScriptData = {
  timestamp: number;
  kind: 'Data',
  values: any[];
};

const handle_avc = (reader: ByteReader): AVCData | null => {
  const packetType = reader.readU8();
  const compositionTimeOffset = reader.readI24BE();

  switch (packetType) {
    case 0: return {
      codec: 'AVC',
      packetType,
      avcDecoderConfigurationRecord: reader.read(),
    };
    case 1: return {
      codec: 'AVC',
      packetType,
      compositionTimeOffset,
      data: reader.read(),
    };
    default:
      return null;
  }
};

export const handle_video = (message: VideoMessage): VideoData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const type = (meta & 0xF0) >> 4;
  const codec = (meta & 0x0F) >> 0;

  switch (codec) {
    case VideoCodecType.AVC: {
      const avc = handle_avc(reader);
      if (avc == null) { return null; }
      return { kind: 'Video', type, timestamp: message.timestamp, ... avc };
    }
    default:
      return null;
  }
};

const handle_aac = (reader: ByteReader): AACData | null => {
  const packetType = reader.readU8();
  switch (packetType) {
    case 0: return {
      codec: 'AAC',
      packetType,
      audioSpecificConfig: reader.read(),
    };
    case 1: return {
      codec: 'AAC',
      packetType,
      data: reader.read(),
    };
    default:
      return null;
  }
};

export const handle_audio = (message: AudioMessage): AudioData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const codec = (meta & 0xF0) >> 4;
  const rate = (meta & 0x0C) >> 2;
  const size = (meta & 0x02) >> 1;
  const type = (meta & 0x01) >> 0;

  switch (codec) {
    case AudioCodecType.AAC: {
      // In AAC: rate, size, type are derived in AudioSpecificConfig
      const aac = handle_aac(reader);
      if (aac == null) { return null; }
      return { kind: 'Audio', timestamp: message.timestamp, ... aac };
    }
    default:
      return null;
  }
};

export const handle_amf0_data = (message: AMF0DataMessage): ActionScriptData | null => {
  const values = read_amf0(message.data);
  return { kind: 'Data', timestamp: message.timestamp, values };
};

export default (message: Message): VideoData | AudioData | ActionScriptData | null => {
  switch (message.message_type_id) {
    case MessageType.Video: return handle_video(message as VideoMessage);
    case MessageType.Audio: return handle_audio(message as AudioMessage);
    case MessageType.DataAMF0: return handle_amf0_data(message as AMF0DataMessage);
    default: return null;
  }
};

AudioSpecificConfigをADTSに変換する

AACデータをMPEG-TSで伝送するためには、FLV内のAudioSpecificConfig(デコーダ設定情報)を解析し、各AACフレームの先頭にADTSヘッダを付与する必要があります。

まず、AudioSpecificConfigの構成は以下になります。

AudioSpecificConfig (ISO/IEC 14496-3 より抜粋)

// AAC LC (AudioObjectType = 2) の場合の AudioSpecificConfig
AudioSpecificConfig() {
  audioObjectType; (5bit; uimsbf)
  if (audioObjectType == 31) {
    audioObjectType = 32 + audioObjectTypeExt; (6bit; uimsbf)
  }
  samplingFrequencyIndex; (4bit; bslbf)
  if (samplingFrequencyIndex == 0xf) {
    samplingFrequency; (24bit; uimsbf)
  }
  channelConfiguration; (4bit; bslbf);
  GASpecificConfig(); // 省略
}

ADTSの基本的な構成は以下になります。

ADTS

adts_frame() {
  // adts_fixed_header()
  syncword; (12bit; bslbf) = 0xFFF; // syncword is 0xFFF
  ID; (1bit; bslbf) = 1; // 1: MPEG-2 AAC, 0: MPEG-4 AAC
  layer; (2bit; uimsbf) = 0;
  protection_absent; (1bit; bslbf) = 1; // 1 is not protected
  profile_ObjectType; (2bit; uimsbf)
  sampling_frequency_index; (4bit; uimsbf)
  private_bit; (1bit; bslbf) = 0; // not used
  channel_configuration; (3bit; uimsbf)
  original_copy; (1bit; bslbf) = 0; // no copyright
  home; (1bit; bslbf) = 0; // 0: copy, 1: original, not used
  // adts_variable_header()
  copyright_identification_bit; (1bit; bslbf) = 0; // no copyright identification
  copyright_identification_start; (1bit; bslbf) = 0; // no copyright identification
  aac_frame_length; (13bit; bslbf)
  adts_buffer_fullness; (11bit; bslbf) = 0x7FF; // 0x7FF is variable rate bitstream
  number_of_raw_data_blocks_in_frame; (2bit; uimsbf) = 0;
  // payload
  raw_data_block();
}

この AudioSpecificConfig を ADTS へと変換します。 ただし、ADTS の profile_ObjectType は AudioSpecificConfig の audioObjectType - 1 となります。 sampling_frequency_indexchannel_configurationについては、ADTSで定義されている範囲においてはAudioSpecificConfigと同一です。

注意点を気をつけて AAC を AudioSpecificConfig から ADTS へ変換するコードは、以下のようになります。

aac.mts

import BitBuilder from './bit-builder.mts';
import BitReader from './bit-reader.mts';

const samplingFrequencyTable = [
  96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350,
] as const;

export type AudioSpecificConfig = {
  audioObjectType: number;
  samplingFrequencyIndex: number;
  samplingFrequency: number;
  channelConfiguration: number;
};

export const read_audio_specific_config = (audioSpecificConfig: Buffer): AudioSpecificConfig => {
  const reader = new BitReader(audioSpecificConfig);

  let audioObjectType = reader.readBits(5);
  if (audioObjectType === 31) { audioObjectType = 32 + reader.readBits(6); }

  const samplingFrequencyIndex = reader.readBits(4);
  const samplingFrequency = samplingFrequencyIndex === 0x0f ? reader.readBits(24) : samplingFrequencyTable[samplingFrequencyIndex];
  const channelConfiguration = reader.readBits(4);

  return {
    audioObjectType,
    samplingFrequencyIndex,
    samplingFrequency,
    channelConfiguration,
  };
};

export const write_adts_aac = (data: Buffer, audioSpecificConfig: AudioSpecificConfig): Buffer => {
  const builder = new BitBuilder();

  const { audioObjectType, samplingFrequencyIndex, channelConfiguration } = audioSpecificConfig;
  const frameLength = 7 + data.byteLength;

  builder.writeBits(0xFFF, 12); // syncword
  builder.writeBits(1, 1); // mpeg_version
  builder.writeBits(0, 2); // layer
  builder.writeBits(1, 1); // protection_absent (protected = 0)
  builder.writeBits(audioObjectType - 1, 2); // profile
  builder.writeBits(samplingFrequencyIndex, 4); // sampling_frequency_index
  builder.writeBits(0, 1); // private_bit
  builder.writeBits(channelConfiguration, 3); // channel_configuration
  builder.writeBits(0, 1); // original/copy
  builder.writeBits(0, 1); // home
  builder.writeBits(0, 1); // copyright_identification_bit
  builder.writeBits(0, 1); // copyright_identification_start
  builder.writeBits(frameLength, 13); // frame_length
  builder.writeBits(0x7FF, 11); // adts_buffer_fullness
  builder.writeBits(0, 2); // number_of_raw_data_blocks_in_frame

  return Buffer.concat([builder.build(), data]);
};

AVCDecoderConfigurationRecordをAnnexBに変換

AVC(H.264)の場合、FLV内のAVCDecoderConfigurationRecordに含まれるSPS(Sequence Parameter Set)とPPS(Picture Parameter Set)を抽出し、AnnexB形式に変換します。

まず、AVCDecoderConfigurationRecord の構成は以下になります。

AVCDecoderConfigurationRecord (ISO/IEC 14496-15 より抜粋)

aligned(8) class AVCDecoderConfigurationRecord {
  unsigned int(8) configurationVersion = 1;
  unsigned int(8) AVCProfileIndication;
  unsigned int(8) profile_compatibility;
  unsigned int(8) AVCLevelIndication;
  bit(6) reserved = ‘111111’b;
  unsigned int(2) lengthSizeMinusOne;
  bit(3) reserved = ‘111’b;
  unsigned int(5) numOfSequenceParameterSets;
  for (i=0; i< numOfSequenceParameterSets; i++) {
    unsigned int(16) sequenceParameterSetLength ;
    bit(8*sequenceParameterSetLength) sequenceParameterSetNALUnit;
  }
  unsigned int(8) numOfPictureParameterSets;
  for (i=0; i< numOfPictureParameterSets; i++) {
    unsigned int(16) pictureParameterSetLength;
    bit(8*pictureParameterSetLength) pictureParameterSetNALUnit;
  }
  if( AVCProfileIndication  ==  100  ||  AVCProfileIndication  ==  110  ||
     AVCProfileIndication  ==  122  ||  AVCProfileIndication  ==  144 )
  {
    bit(6) reserved = '111111'b;
    unsigned int(2) chroma_format;
    bit(5) reserved = '11111'b;
    unsigned int(3) bit_depth_luma_minus8;
    bit(5) reserved = '11111'b;
    unsigned int(3) bit_depth_chroma_minus8;
    unsigned int(8) numOfSequenceParameterSetExt;
    for (i=0; i< numOfSequenceParameterSetExt; i++) {
      unsigned int(16) sequenceParameterSetExtLength;
      bit(8*sequenceParameterSetExtLength) sequenceParameterSetExtNALUnit;
    }
  }
}

AnnexBに変換するには、NALuのサイズのバイト数を示すlengthSizeMinusOneとSPS,PPSが必要です。 実際のAVCデータに対して、NALu単位で分割した上で、先頭にAUD(Access Unit Delimiter)、IDR(FLVのKEYFRAME)の前にSPS/PPSを挿入します。 これらのNALuに対して、それぞれスタートコード(0x00000001 または 0x000001)を先頭に付与して結合すれば、AnnexBフォーマットの出来上がりです。

このAVCDecoderConfigurationRecordをAnnexB形式に変換するコードが以下になります。

avc.mts

import ByteReader from './byte-reader.mts';

export type AVCDecoderConfigurationRecord = {
  configurationVersion: number;
  AVCProfileIndication: number;
  profile_compatibility: number;
  AVCLevelIndication: number;
  lengthSize: number;
  SequenceParameterSets: Buffer[];
  PictureParameterSets: Buffer[];
};

export const read_avc_decoder_configuration_record = (avcDecoderConfigurationRecord: Buffer): AVCDecoderConfigurationRecord => {
  const reader = new ByteReader(avcDecoderConfigurationRecord);

  const configurationVersion = reader.readU8();
  const AVCProfileIndication = reader.readU8();
  const profile_compatibility = reader.readU8();
  const AVCLevelIndication = reader.readU8();
  const lengthSize = (reader.readU8() & 0b00000011) + 1;
  const numOfSequenceParameterSets = reader.readU8() & 0b00011111;
  const SequenceParameterSets = Array.from({ length: numOfSequenceParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  const numOfPictureParameterSets = reader.readU8();
  const PictureParameterSets = Array.from({ length: numOfPictureParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  return {
    configurationVersion,
    AVCProfileIndication,
    profile_compatibility,
    AVCLevelIndication,
    lengthSize,
    SequenceParameterSets,
    PictureParameterSets,
  };
};

export const write_annexb_avc = (sizedNalus: Buffer, avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord): Buffer => {
  const reader = new ByteReader(sizedNalus);

  const annexb: Buffer[] = [];
  const startcode = Buffer.from([0x00, 0x00, 0x00, 0x01]);
  let hasAUD = false;
  let hasIDR = false;
  while (!reader.isEOF()) {
    const length = reader.readUIntBE(avcDecoderConfigurationRecord.lengthSize);
    const nalu = reader.read(length);
    const naluType = nalu.readUInt8(0) & 0x1F;

    switch (naluType) {
      case 0x09: hasAUD = true; break;
      case 0x05: { // IDR
        if (hasIDR) { break; }
        for (const sps of avcDecoderConfigurationRecord.SequenceParameterSets) {
          annexb.push(startcode, sps);
        }
        for (const pps of avcDecoderConfigurationRecord.PictureParameterSets) {
          annexb.push(startcode, pps);
        }
        hasIDR = true;
        break;
      }
    }
    annexb.push(startcode, nalu);
  }
  const aud = hasAUD ? [] : [startcode, Buffer.from([0x09, 0xF0])];
  return Buffer.concat([... aud, ... annexb]);
};

SectionやPESを構築する

今回必要なSectionはPAT/PMTのみなので、PAT/PMTをモデル化してSectionのフォーマットに変換します。 SectionはCRC32による誤り検出を含むため、そのコードも用意します。

まず、PATのデータ構造は以下になります。

PATの構成

program_association_section() {
  table_id; (8bit; uimsbf)
  section_syntax_indicator; (1bit; bslbf)
  '0'; (1bit; bslbf)
  reserved; (2bit; bslbf)
  section_length; (12bit; uimsbf);
  transport_stream_id; (16bit; uimsbf)
  reserved; (2bit; uimsbf)
  version_number; (5bit; uimsbf)
  current_next_indicator; (1bit; bslbf)
  section_number; (8bit;  uimsbf)
  last_section_number; (8bit;  uimsbf)
  for (i = 0; i < N; i++) {
    program_number; (16bit; uimsbf)
    reserved; (3bit; bslbf)
    if (program_number == '0') {
      network_PID; (13bit; uimsbf)
    } else {
      program_map_PID; (13bit; uimsbf)
    }
  }
  CRC32; (32bit; rpchof)
}

PATの構成要素について箇条書きで説明します。

  • section_syntax_indicator: PATは拡張形式なので 1 です。
  • section_length: この値以後のセクションの長さ (CRC32を含む)
  • transport_stream_id: 外部で決めた、このTSのIDです
  • version_number: このセクション(PAT)に更新があったらインクリメントする値
  • current_next_indicator: 1 -> current, 0 -> next を識別するbit
  • section_number: PATは0固定
  • last_section_number: PATは0固定
  • program_number: プログラム(サービス)の識別子

次に、PMTのデータ構造は以下になります。

PMTの構成

program_map_section() {
  table_id; (8bit; uimsbf)
  section_syntax_indicator; (1bit; bslbf)
  '0'; (1bit; bslbf)
  reserved; (2bit; bslbf)
  section_length; (12bit; uimsbf);
  program_number; (16bit; uimsbf)
  reserved; (2bit; uimsbf)
  version_number; (5bit; uimsbf)
  current_next_indicator; (1bit; bslbf)
  section_number; (8bit;  uimsbf)
  last_section_number; (8bit;  uimsbf)
  reserved; (3bit; bslbf)
  PCR_PID; (13bit; uimsbf)
  reserved; (4bit; bslbf)
  program_info_length; (12bit; uimsbf);
  for (i = 0; i < N; i++) {
    descriptor();
  }
  for (i = 0; i < N1; i++) {
    stream_type; (8bit; uimsbf)
    reserved; (3bit; bslbf)
    elementary_PID; (13bit; uimsbf)
    reserved; (4bit; bslbf)
    ES_info_length; (12bit; uimsbf);
    for (i = 0; i < N2; i++) {
      descriptor();
    }
  }
  CRC32; (32bit; rpchof)
}

PMTの構成要素について箇条書きで説明します。

  • section_syntax_indicator: PMTは拡張形式なので 1 です。
  • section_length: この値以後のセクションの長さ (CRC32を含む)
  • program_number: PATで指定されたサービスIDになります
  • version_number: このセクション(PMT)に更新があったらインクリメントする値
  • current_next_indicator: 1 -> current, 0 -> next を識別するbit
  • section_number: PMTは0固定
  • last_section_number: PMTは0固定
  • PCR_PID: このプログラムのPCRのPID
  • program_info_length: プログラム自体の情報の長さ
  • stream_type: ES(PES)の種別 (AVC,AACとかを識別する)
  • elementary_PID: プログラムに紐づくES(PES)のPID
  • ES_info_length: このES(PES)に紐づく情報の長さ

所定の値を埋めて、PAT/PMTを構築するコードは以下になります。

PMT, PAT の構築

const crc32_table = new Uint32Array(256);
for (let i = 0; i < crc32_table.length; i++) {
  let c = i << 24;
  for (let j = 0; j < 8; j++) {
    c = ((c << 1) ^ ((c & 0x80000000) ? 0x04c11db7 : 0)) >>> 0;
  }
  crc32_table[i] = c;
}
const crc32 = (data: Buffer) => {
  let crc = 0xFFFFFFFF;
  for (const datum of data) {
    crc = ((crc << 8) ^ crc32_table[((crc >>> 24) ^ datum) & 0xFF]) >>> 0;
  }
  return crc;
};

export type PAT = {
  transport_stream_id: number;
  programs: {
    program_number: number;
    program_map_PID: number;
  }[];
};

export const write_pat = (pat: PAT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x00, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pat.transport_stream_id, 16); // transport_stream_id
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  for (const { program_number, program_map_PID } of pat.programs) {
    builder.writeBits(program_number, 16); // program_number
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(program_map_PID, 13); // program_map_PID
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
};

export type PMT = {
  program_number: number;
  pcr_pid: number;
  program_info?: Buffer;
  streams: {
    stream_type: number;
    elementary_PID: number;
    ES_info?: Buffer;
  }[];
};

export const write_pmt = (pmt: PMT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x02, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pmt.program_number, 16); // program_number
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  builder.writeBits(0b111, 3); // reserved
  builder.writeBits(pmt.pcr_pid, 13); // PCR_PID
  builder.writeBits(0b1111, 4); // reserved
  builder.writeBits(pmt.program_info?.byteLength ?? 0, 12); // program_info_length
  builder.writeBuffer(pmt.program_info ?? Buffer.from([])); // program_info
  for (const { stream_type, elementary_PID, ES_info } of pmt.streams) {
    builder.writeBits(stream_type, 8); // stream_type
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(elementary_PID, 13); // elementary_PID
    builder.writeBits(0b1111, 4); // reserved
    builder.writeBits(ES_info?.byteLength ?? 0, 12); // ES_info_length
    builder.writeBuffer(ES_info ?? Buffer.from([])); // ES_info
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
};

PESはコーデックの内容やPTS/DTSなどのタイミング情報を送る単位です。 まず、PESのデータ構造は以下になります。

PESの構造

PES_packet() {
  packet_start_code_prefix; (24bit; bslbf);
  stream_id; (8bit; uimsbf);
  PES_packet_length; (16bit; uimsbf);
  // 今回は PTS/DTS に関係ある部分だけ記載する
  '10'; (2bit; bslbf)
  PES_scrambling_control; (2bit; bslbf);
  PES_priority; (1bit; bslbf)
  data_alignment_indicator; (1bit; bslbf);
  copyright; (1bit; bslbf);
  original_or_copy; (1bit; bslbf);
  PTS_DTS_flags; (2bit; bslbf);
  ESCR_flag; (1bit; bslbf);
  ES_rate_flag; (1bit; bslbf);
  DSM_trick_mode_flag; (1bit; bslbf);
  additional_copy_info_flag; (1bit; bslbf);
  PES_CRC_flag; (1bit; bslbf);
  PES_extension_flag; (1bit; bslbf);
  PES_header_data_length; (8bit; uimsbf)
  if (PTS_DTS_flags == '10') {
    '0010'; (4bit; bslbf)
    PTS[32.30]; (1bit; bslbf)
    marker_bit; (1bit; bslbf)
    PTS[29...15]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
    PTS[14...0]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
  }
if (PTS_DTS_flags == '11') {
    '0011'; (4bit; bslbf)
    PTS[32.30]; (1bit; bslbf)
    marker_bit; (1bit; bslbf)
    PTS[29...15]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
    PTS[14...0]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
    '0001'; (4bit; bslbf)
    DTS[32.30]; (1bit; bslbf)
    marker_bit; (1bit; bslbf)
    DTS[29...15]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
    DTS[14...0]; (15bit; bslbf)
    marker_bit; (1bit; bslbf)
  }
}

PESのそれぞれの構成要素の説明は以下になります。

  • packet_start_code_prefix: 0x000001 固定
  • stream_id: MPEG-TS の仕様で決められた stream_id をそのまま入れる
  • PES_packet_length: このフィールド以降のPES長 (0の場合は次のPESまでの不定長)
  • PTS_DTS_flags: PTS/DTSがあるかどうかのフラグそのもの
  • PES_header_data_length: PTS/DTSなどの情報の長さ
  • ※ これ以外は今回は使わないので、0で埋める

PESを構築するコードは以下になります。

PESの構築

const pes_not_contain_flags = new Set([0xBC, 0xBE, 0xBF, 0xF0, 0xF1, 0xFF, 0xF2, 0xF8]);
const pes_has_flags = (stream_id: number) => {
  return !pes_not_contain_flags.has(stream_id);
};

export const write_pes = (data: Buffer, stream_id: number, pts: number | null, dts: number | null, omit_length = false) => {
  if (pts == null) { dts = null; }

  const builder = new BitBuilder();
  const PES_header_data_length = (pts != null ? 5 : 0) + (dts != null ? 5 : 0);

  builder.writeBits(1, 24); // start_code_prefix
  builder.writeBits(stream_id, 8); // stream_id
  builder.writeBits(0, 16); // pes_length
  if (pes_has_flags(stream_id)) {
    builder.writeBits(0b10000000, 8);
    builder.writeBits(pts != null ? 0b1 : 0b0, 1); // pts present
    builder.writeBits(dts != null ? 0b1 : 0b0, 1); // dts present
    builder.writeBits(0b000000, 6);
    builder.writeBits(PES_header_data_length, 8); // PES_header_data_length
    if (pts != null) {
      const pts_binary = Buffer.alloc(5);
      pts_binary.writeUIntBE(pts, 0, 5);
      const reader = new BitReader(pts_binary);
      reader.skipBits(7);

      builder.writeBits(dts != null ? 0b0011 : 0b0010, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
    if (dts != null) {
      const dts_binary = Buffer.alloc(5);
      dts_binary.writeUIntBE(dts, 0, 5);
      const reader = new BitReader(dts_binary);
      reader.skipBits(7);

      builder.writeBits(0b0001, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
  }
  const header = builder.build();
  const pes = Buffer.concat([header, data]);
  if (!omit_length) { pes.writeUInt16BE(pes.byteLength - 6, 4); }
  return pes;
};

TSパケットへパケタイズする

SectionやPESを実際に伝送するために、TSパケットへとパケタイズします。 これは188byte単位(4bytesのヘッダー+184bytesのペイロード)に分割するという形になります。

TSパケットの構造

transport_packet() {
  sync_byte; (8bit; bslbf)
  transport_error_indicator; (1bit; bslbf);
  payload_unit_start_indicator; (1bit; bslbf);
  transport_priority; (1bit; bslbf);
  PID; (13bit; bslbf);
  transport_scrambling_control; (2bit; bslbf);
  adaptation_field_control; (2bit; bslbf);
  continuity_counter; (4bit; uimsbf);
  if (adaptation_field_control == '10' || adaptation_field_control == '11') {
    adaptation_field();
  }
  if (adaptation_field_control == '01' || adaptation_field_control == '11') {
    for (i = 0; i < N; i++) {
      data_byte();
    }
  }
}

今回は、transport_error_indicator,transport_priority,transport_scrambling_controlは使わないので0固定にします。 それ以外のうち payload_unit_start_indicator は分割した最初のパケットだけ1にし、continuity_counterはペイロードがある場合にインクリメントさせます。

基本的にパケット分割はPES/Sectionで共通ですが、スタッフィングに関わる挙動だけ違うので要注意です。 Sectionは0xFFのスタッフィングバイトで余剰バイトを埋めますが、 PESの場合はAdaptationFieldを用いてスタッフィングを行います。 このため、PESのスタッフィング時にはadaptation_field_controlでAdaptationFieldを使うようにする必要があります。

以上のような分割処理を行うコードは以下になります。

パケタイズ処理

const PACKET_SIZE = 188;
const HEADER_SIZE = 4;
const PAYLOAD_SIZE = PACKET_SIZE - HEADER_SIZE;
const SYNC_BYTE = 0x47;
const STUFFING_BYTE = 0xFF;

export class SectionPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(section: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < section.byteLength; i += (PAYLOAD_SIZE - (i === 0 ? 1 : 0))) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + (PAYLOAD_SIZE - (i === 0 ? 1 : 0)), section.byteLength) - i;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continuity_counter
      if (i === 0) { builder.writeBits(0, 8); } // pointer_field

      const header = builder.build();
      header.copy(packet, 0);
      section.copy(packet, header.byteLength, i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

export class PESPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pes: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < pes.byteLength; i += PAYLOAD_SIZE) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + PAYLOAD_SIZE, pes.byteLength) - i;
      const filler = PAYLOAD_SIZE - length;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(filler > 0 ? 1 : 0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continuity_counter
      if (filler > 0) { builder.writeBits(filler - 1, 8); }
      if (filler > 1) { builder.writeBits(0, 8); }

      const header = builder.build();
      header.copy(packet, 0);
      pes.copy(packet, header.byteLength + Math.max(filler - 2, 0), i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

export class PCRPacketizer {
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pcr: number): Buffer {
    const packet = Buffer.alloc(PACKET_SIZE, STUFFING_BYTE);

    const builder = new BitBuilder();
    builder.writeBits(SYNC_BYTE, 8);
    builder.writeBits(0, 1); // transport_err_indicator
    builder.writeBits(0, 1); // payload_unit_start_indicator
    builder.writeBits(0, 1); // transport_priority
    builder.writeBits(this.pid, 13);
    builder.writeBits(0, 2); // transport_scrambling_control
    builder.writeBits(1, 1); // adaptation_field_control (adaptation_field_present)
    builder.writeBits(0, 1); // adaptation_field_control (payload present)
    builder.writeBits(0, 4); // continuity_counter
    builder.writeBits(PACKET_SIZE - HEADER_SIZE - 1, 8); // adaptation field length
    builder.writeBits(0x10, 8); // PCR is Present
    builder.writeBits(pcr, 33); // PCR base
    builder.writeBits(0, 6); // reserved
    builder.writeBits(0, 9); // PCR extension

    builder.build().copy(packet);
    return packet;
  }
}

トランスマックスする

では、FLVからMPEG-TSへのトランスマックスを司るクラスを定義します。

今回はコーデックのPIDは全てあらかじめ固定して送ります。ですので、音声,映像,PCR,PMTにそれぞれPIDを固定で割り当てます。 また、PAT,PMTはいつでも読み取れるようにするため周期的に入れる必要があります。今回は100ms単位で入れています。 100msなのは、一般的な運用ルールでそのように定められているためで、今回はそのルールに従います。

今までの道具立てから、トランスマックスを行うコードが以下になります。

mpegts-transmuxer.mts

import type { Message } from './message-reader.mts';

import { read_avc_decoder_configuration_record, write_annexb_avc } from './avc.mts';
import type { AVCDecoderConfigurationRecord } from './avc.mts';
import { read_audio_specific_config, write_adts_aac } from './aac.mts';
import type { AudioSpecificConfig } from './aac.mts';
import handle_rtmp_payload from './rtmp-handler.mts';
import { SectionPacketizer, PESPacketizer, PCRPacketizer, write_pat, write_pmt, write_pes } from './mpegts.mts';
import type { PAT, PMT } from './mpegts.mts';

const PMT_PID = 256;
const PCR_PID = 257;
const AVC_PID = 258;
const AAC_PID = 259;
const PAT_DATA = {
  transport_stream_id: 0,
  programs: [{
    program_number: 1,
    program_map_PID: PMT_PID,
  }],
} as const satisfies PAT;
const PMT_DATA = {
  program_number: 1,
  pcr_pid: PCR_PID,
  streams: [{
    stream_type: 0x1b, // AVC
    elementary_PID: AVC_PID,
  }, {
    stream_type: 0x0F, // AAC (ADTS)
    elementary_PID: AAC_PID,
  }],
} as const satisfies PMT;
const emit_PSI_interval = 100;

// RTMPのタイムスタンプはミリ秒(1kHz)単位だが、MPEG-TSのPTS/DTSは90kHzクロックを基準とするため90倍する
const timestamp_from_rtmp_to_mpegts = (timestamp: number): number => {
  return timestamp * 90;
};

export default class MPEGTSTransmuxer {
  private avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord | null = null;
  private audioSpecificConfig: AudioSpecificConfig | null = null;
  private patPacketizer = new SectionPacketizer(0);
  private pmtPacketizer = new SectionPacketizer(PMT_PID);
  private pcrPacketizer = new PCRPacketizer(PCR_PID);
  private avcPacketizer = new PESPacketizer(AVC_PID);
  private aacPacketizer = new PESPacketizer(AAC_PID);
  private last_emit_PSI_timestamp: number | null = null;

  public* feed(message: Message): Iterable<Buffer> {
    if (this.last_emit_PSI_timestamp == null || (message.timestamp - this.last_emit_PSI_timestamp) >= emit_PSI_interval) {
      const packets = [
        ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
        ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
        this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(message.timestamp)),
      ];

      yield* packets;
      this.last_emit_PSI_timestamp = message.timestamp;
    }

    const payload = handle_rtmp_payload(message);
    if (payload == null) { return; }

    switch (payload.kind) {
      case 'Video':
        if (payload.codec !== 'AVC') { return; }
        if (payload.packetType === 0) {
          this.avcDecoderConfigurationRecord = read_avc_decoder_configuration_record(payload.avcDecoderConfigurationRecord);
          return;
        }
        break;
      case 'Audio':
        if (payload.codec !== 'AAC') { return; }
        if (payload.packetType === 0) {
          this.audioSpecificConfig = read_audio_specific_config(payload.audioSpecificConfig);
          return;
        }
        break;
      default:
        return;
    }

    switch (payload.kind) {
      case 'Video':
        if (this.avcDecoderConfigurationRecord == null) { return; }
        yield* this.avcPacketizer.packetize(
          write_pes(
            write_annexb_avc(payload.data, this.avcDecoderConfigurationRecord),
            0xe0,
            timestamp_from_rtmp_to_mpegts(payload.timestamp + payload.compositionTimeOffset),
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            true,
          ),
        );
        break;
      case 'Audio':
        if (this.audioSpecificConfig == null) { return; }
        yield* this.aacPacketizer.packetize(
          write_pes(
            write_adts_aac(payload.data, this.audioSpecificConfig),
            0xc0,
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            null,
            false,
          ),
        );
        break;
    }
  }
}

HTTP-TSで配信する

HTTPサーバによるストリーミング配信

前回のHTTP-FLVと大体同じですが、MPEG-TSは周期的に初期化情報を送るので初期化済みなどのフラグは必要なくなります。 このため、書き込みと閉じる処理だけ管理すればよくなっています。

前回と大体同じですが、コードは以下になります。

import net from 'node:net';
import http from 'node:http';
import type { Duplex } from 'node:stream';
import { parseArgs } from 'node:util';
import type { ParseArgsOptionsConfig } from 'node:util';

import handle_rtmp from './rtmp-accepter.mts';

import MPEGTSTransmuxer from './mpegts-transmuxer.mts';

const options = {
  rtmp: {
    type: 'string',
    default: '1935',
  },
  web: {
    type: 'string',
    default: '8000',
  },
  app: {
    type: 'string',
  },
  streamKey: {
    type: 'string',
  },
  highWaterMark: {
    type: 'string',
  },
  bandwidth: {
    type: 'string',
  },
} as const satisfies ParseArgsOptionsConfig;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.rtmp, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (Number.isNaN(Number.parseInt(args.web, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (args.app == null) {
  console.error('Please Specify valid app'); process.exit(1);
}
if (args.streamKey == null) {
  console.error('Please Specify valid streamKey'); process.exit(1);
}
if (args.highWaterMark != null && Number.isNaN(Number.parseInt(args.highWaterMark, 10))) {
  console.error('Please Specify valid highWaterMark'); process.exit(1);
}
if (args.bandwidth != null && Number.isNaN(Number.parseInt(args.bandwidth, 10))) {
  console.error('Please Specify valid bandwidth'); process.exit(1);
}
const port = Number.parseInt(args.rtmp, 10);
const web = Number.parseInt(args.web, 10);
const app = args.app;
const streamKey = args.streamKey;
const highWaterMark = args.highWaterMark != null ? Number.parseInt(args.highWaterMark, 10) : undefined;
const bandwidth = args.bandwidth != null ? Number.parseInt(args.bandwidth, 10) : undefined;

type StreamingInformation = [writeFn: (buffer: Buffer) => void, exitFn: () => void];
const streaming = new Map<number, StreamingInformation>();
const handle = async (connection: Duplex) => {
  const rtmp_to_mpegts = new MPEGTSTransmuxer();
  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      for (const packet of rtmp_to_mpegts.feed(message)) {
        for (const [write, _] of streaming.values()) { write(packet); }
      }
    }
  } catch (e) {
    console.error(e);
  } finally {
    for (const [_, end] of streaming.values()) { end(); }
  }
};

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

let viewers = 0;
const web_server = http.createServer({ highWaterMark }, (req, res) => {
  if (req.url == null) {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname === `/${app}/${streamKey}`)) {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
    return;
  }
  const viewer = viewers++;
  res.writeHead(200, {
    'content-type': 'video/mp2t',
    'access-control-allow-origin': '*',
  });

  const write = (chunk: Buffer) => {
    if (res.closed) { return; }
    if (!res.write(chunk) && highWaterMark != null) { res.destroy(); }
  };

  const exit = () => { res.end(); streaming.delete(viewer); };
  const entry = [write, exit] satisfies StreamingInformation;
  streaming.set(viewer, entry);

  req.on('close', exit);
  res.on('close', exit);
  req.on('error', exit);
  res.on('error', exit);
});
web_server.listen({ port: web });

受信の仕方

HTTP-TSはffplayなどのクライアントで受信できます。MSEを使ったブラウザで受信するプレイヤーもあるようです。

今回のコードで受信する場合は、ffplayで以下のように指定します。${web}はWebサーバのポート、${app}${streamKey}は指定したアプリ名とストリームキーです。

HTTP-TS受信コマンド(ffplay)

ffplay -f mpegts http://localhost:${web}/${app}/${streamKey}

このコマンドでRTMPで打ち上げた内容と同じ内容が出力されたら、HTTP-TS配信成功です。

まとめ

今度はMPEG-TSへのトランスマックスを実装して理解するというテーマでやってみましたが、いかがでしたでしょうか。 コンテナにデータを詰める部分は、なかなかに普段触ることがないブラックボックス化しがちですが、実際にやってみると意外と簡単だと感じたのではないでしょうか? そして、MPEG-TSへのトランスマックスができれば、HLS化はMPEG-TSセグメント化するだけで可能です。 ですので、HLS化はもう目前です、長かった...。

ちなみに、今回はコード行数を数えると1600行弱でした。800行くらいトランスマックス処理にかかっています。トランスマックス処理はコード量がかさみますね。

おまけ

今回もブログ記事で紹介したコード片を1ファイルにまとめてみました。このTypeScriptのコードをファイルに貼り付ければ、Node24ならNode.jsだけで動きますので、是非試してHTTP-TS配信をやってみてください。

1ファイルまとめ版 (クリックで展開)

bundled.mts

import net from "node:net";
import http from "node:http"
import { randomBytes } from 'node:crypto'
import { Duplex, Writable } from "node:stream";
import { parseArgs } from "node:util";
import type { ParseArgsOptionsConfig } from "node:util"

type AsyncByteReaderOption = {
  signal: AbortSignal;
}

class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private eof = false;
  private promises: [number, (result: Buffer) => void, (error: Error) => void][] = [];

  public constructor(option?: Partial<AsyncByteReaderOption>) {
    option?.signal?.addEventListener('abort', this.feedEOF.bind(this), { once: true });
  }

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }

    if (!this.eof) { return; }
    while (this.promises.length > 0) {
      const [,,reject] = this.promises[0];
      reject(new Error('EOF Exception'));
      this.promises.shift();
    }
  }

  public feed(buffer: Buffer): void {
    if (this.eof) { return; }
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public feedEOF(): void {
    this.eof = true;
    this.fulfill();
  }

  public read(size: number): Promise<Buffer> {
    const { promise, resolve, reject } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve, reject]);
    this.fulfill();
    return promise;
  }

  public async readU8(): Promise<number> {
    const buffer = await this.read(1);
    return buffer.readUIntBE(0, 1);
  }

  public async readU16BE(): Promise<number> {
    const buffer = await this.read(2);
    return buffer.readUIntBE(0, 2);
  }

  public async readU24BE(): Promise<number> {
    const buffer = await this.read(3);
    return buffer.readUIntBE(0, 3);
  }

  public async readU32BE(): Promise<number> {
    const buffer = await this.read(4);
    return buffer.readUIntBE(0, 4);
  }

  public async readU16LE(): Promise<number> {
    const buffer = await this.read(2);
    return buffer.readUIntLE(0, 2);
  }

  public async readU24LE(): Promise<number> {
    const buffer = await this.read(3);
    return buffer.readUIntLE(0, 3);
  }

  public async readU32LE(): Promise<number> {
    const buffer = await this.read(4);
    return buffer.readUIntLE(0, 4);
  }
}

class ByteReader {
  private buffer: Buffer;
  private offset = 0;

  public constructor(buffer: Buffer) {
    this.buffer = buffer;
  }

  public isEOF(): boolean {
    return this.offset >= this.buffer.byteLength;
  }

  public read(length: number): Buffer {
    if (length < 0) { length = this.buffer.byteLength - this.offset; }
    const value = this.buffer.subarray(this.offset, this.offset + length);
    this.offset += length;
    return value;
  }

  public readUIntBE(length: number): number {
    return this.read(length).readUIntBE(0, length);
  }

  public readIntBE(length: number): number {
    return this.read(length).readIntBE(0, length);
  }

  public readU8(): number {
    return this.readUIntBE(1);
  }

  public readU16BE(): number {
    return this.readUIntBE(2);
  }

  public readU24BE(): number {
    return this.readUIntBE(3);
  }

  public readU32BE(): number {
    return this.readUIntBE(4);
  }

  public readI8(): number {
    return this.readIntBE(1);
  }

  public readI16BE(): number {
    return this.readIntBE(2);
  }

  public readI24BE(): number {
    return this.readIntBE(3);
  }

  public readI32BE(): number {
    return this.readIntBE(4);
  }

  public readF64BE(): number {
    return this.read(8).readDoubleBE(0);
  }
}

class BitReader {
  private bits: number[];
  private reader: ByteReader;

  public constructor(data: Buffer) {
    this.bits = [];
    this.reader = new ByteReader(data);
  }

  public isEOF(): boolean {
    return this.reader.isEOF() && this.bits.length === 0;
  }

  private fill(): void {
    const byte = this.reader.readU8();
    for (let i = 7; i >= 0; i--) {
      this.bits.push((byte >> i) & 1);
    }
  }

  private shift(): number {
    if (this.isEOF()) { throw new Error('EOF Exception'); }
    if (this.bits.length === 0) { this.fill(); }
    return this.bits.shift()!;
  }

  public skipBits(length: number): void {
    while (length > 0) {
      this.shift();
      length -= 1;
    }
  }

  public readBits(length: number): number {
    let bits = 0;
    while (length > 0) {
      bits = bits * 2 + this.shift();
      length -= 1;
    }
    return bits;
  }

  public readBool(): boolean {
    return this.readBits(1) === 1;
  }
}

class BitBuilder {
  private bits: number[] = [];
  private data: number[] = [];

  private fill(): void {
    while (this.bits.length >= 8) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + this.bits.shift()!;
      }
      this.data.push(datum);
    }
  }

  public writeBits(value: number, length: number): void {
    for (let i = length - 1; i >= 0; i--) {
      this.bits.push(Math.floor((value % (2 ** (i + 1))) / (2 ** i)));
    }
    this.fill();
  }

  public writeByte(value: number): void {
    this.writeBits(value, 8);
  }

  public writeBuffer(value: Buffer): void {
    for (const byte of value) { this.writeByte(byte); }
  }

  public build(): Buffer {
    const values = [... this.data];

    if (this.bits.length > 0) {
      let datum = 0;
      for (let i = 0; i < 8; i++) {
        datum = datum * 2 + (this.bits[i] ?? 0);
      }
      values.push(datum);
    }

    return Buffer.from(values);
  }
}

type AVCDecoderConfigurationRecord = {
  configurationVersion: number;
  AVCProfileIndication: number;
  profile_compatibility: number;
  AVCLevelIndication: number;
  lengthSize: number;
  SequenceParameterSets: Buffer[];
  PictureParameterSets: Buffer[];
}

const read_avc_decoder_configuration_record = (avcDecderCongiurationRecord: Buffer): AVCDecoderConfigurationRecord => {
  const reader = new ByteReader(avcDecderCongiurationRecord);

  const configurationVersion = reader.readU8();
  const AVCProfileIndication = reader.readU8();
  const profile_compatibility = reader.readU8();
  const AVCLevelIndication = reader.readU8();
  const lengthSize = (reader.readU8() & 0b00000011) + 1;
  const numOfSequenceParameterSets = reader.readU8() & 0b00011111;
  const SequenceParameterSets = Array.from({ length: numOfSequenceParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  const numOfPictureParameterSets = reader.readU8();
  const PictureParameterSets = Array.from({ length: numOfPictureParameterSets }, () => {
    const sequenceParameterSetLength = reader.readU16BE();
    return reader.read(sequenceParameterSetLength);
  });
  return {
    configurationVersion,
    AVCProfileIndication,
    profile_compatibility,
    AVCLevelIndication,
    lengthSize,
    SequenceParameterSets,
    PictureParameterSets
  };
}

const write_annexb_avc = (sizedNalus: Buffer, avcDecderCongiurationRecord: AVCDecoderConfigurationRecord): Buffer => {
  const reader = new ByteReader(sizedNalus);

  const annexb: Buffer[] = [];
  const startcode = Buffer.from([0x00, 0x00, 0x00, 0x01]);
  let hasAUD = false;
  let hasIDR = false;
  while (!reader.isEOF()) {
    const length = reader.readUIntBE(avcDecderCongiurationRecord.lengthSize);
    const nalu = reader.read(length);
    const naluType = nalu.readUInt8(0) & 0x1F;

    switch (naluType) {
      case 0x09: hasAUD = true; break;
      case 0x05: { // IDR
        if (hasIDR) { break; }
        for (const sps of avcDecderCongiurationRecord.SequenceParameterSets) {
          annexb.push(startcode, sps);
        }
        for (const pps of avcDecderCongiurationRecord.PictureParameterSets) {
          annexb.push(startcode, pps);
        }
        hasIDR = true;
        break;
      }
    }
    annexb.push(startcode, nalu);
  }
  const aud = hasAUD ? [] : [startcode, Buffer.from([0x09, 0xF0])];
  return Buffer.concat([... aud, ... annexb]);
}

const samplingFrequencyTable = [
  96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050,
  16000, 12000, 11025, 8000, 7350,
] as const;

type AudioSpecificConfig = {
  audioObjectType: number;
  samplingFrequencyIndex: number;
  samplingFrequency: number;
  channelConfiguration: number;
}

const read_audio_specific_config = (audioSpecificConfig: Buffer): AudioSpecificConfig => {
  const reader = new BitReader(audioSpecificConfig);

  let audioObjectType = reader.readBits(5);
  if (audioObjectType === 31) { audioObjectType = 32 + reader.readBits(6); }

  const samplingFrequencyIndex = reader.readBits(4);
  const samplingFrequency = samplingFrequencyIndex === 0x0f ? reader.readBits(24) : samplingFrequencyTable[samplingFrequencyIndex];
  const channelConfiguration = reader.readBits(4);

  return {
    audioObjectType,
    samplingFrequencyIndex,
    samplingFrequency,
    channelConfiguration
  };
}

const write_adts_aac = (data: Buffer, audioSpecificConfig: AudioSpecificConfig): Buffer => {
  const builder = new BitBuilder();

  // TODO: AAC-LC or Main で samplingFrequencyIndex !== 0x0F なものに絞るべき
  const { audioObjectType, samplingFrequencyIndex, channelConfiguration } = audioSpecificConfig;
  const frameLength = 7 + data.byteLength;

  builder.writeBits(0xFFF, 12); // syncword
  builder.writeBits(1, 1); // mpeg_version
  builder.writeBits(0, 2); // layer
  builder.writeBits(1, 1); // protection_absent (protected = 0)
  builder.writeBits(audioObjectType - 1, 2); // profile
  builder.writeBits(samplingFrequencyIndex, 4); // sampling_frequency_index
  builder.writeBits(0, 1); // private_bit
  builder.writeBits(channelConfiguration, 3); // channel_configuration
  builder.writeBits(0, 1); // original/copy
  builder.writeBits(0, 1); // home
  builder.writeBits(0, 1); // copyright_identification_bit
  builder.writeBits(0, 1); // copyright_identification_start
  builder.writeBits(frameLength, 13); // frame_length
  builder.writeBits(0x7FF, 11); // adts_buffer_fullness
  builder.writeBits(0, 2); // number_of_raw_data_blocks_in_frame

  return Buffer.concat([builder.build(), data]);
}

const PACKET_SIZE = 188;
const HEADER_SIZE = 4;
const PAYLOAD_SIZE = PACKET_SIZE - HEADER_SIZE;
const SYNC_BYTE = 0x47;
const STUFFING_BYTE = 0xFF;

const pes_not_contain_flags = new Set([0xBC, 0xBE, 0xBF, 0xF0, 0xF1, 0xFF, 0xF2, 0xF8]);
const pes_has_flags = (stream_id: number) => {
  return !pes_not_contain_flags.has(stream_id);
}

const crc32_table = new Uint32Array(256);
for (let i = 0; i < crc32_table.length; i++) {
  let c = i << 24;
  for (let j = 0; j < 8; j++) {
    c = ((c << 1) ^ ((c & 0x80000000) ? 0x04c11db7 : 0)) >>> 0;
  }
  crc32_table[i] = c;
}
const crc32 = (data: Buffer) => {
  let crc = 0xFFFFFFFF;
  for (const datum of data) {
    crc = ((crc << 8) ^ crc32_table[((crc >>> 24) ^ datum) & 0xFF]) >>> 0;
  }
  return crc;
}

type PAT = {
  transport_stream_id: number;
  programs: {
    program_number: number;
    program_map_PID: number;
  }[];
};

const write_pat = (pat: PAT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x00, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pat.transport_stream_id, 16); // transport_stream_id
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  for (const { program_number, program_map_PID } of pat.programs) {
    builder.writeBits(program_number, 16); // program_number
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(program_map_PID, 13); // program_map_PID
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
}

type PMT = {
  program_number: number;
  pcr_pid: number;
  program_info?: Buffer;
  streams: {
    stream_type: number;
    elementary_PID: number;
    ES_info?: Buffer;
  }[];
};

const write_pmt = (pmt: PMT) => {
  const builder = new BitBuilder();

  builder.writeBits(0x02, 8); // table_id
  builder.writeBits(1, 1); // section_syntax_indicator
  builder.writeBits(0, 1); // 0
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 12); // section_length
  builder.writeBits(pmt.program_number, 16); // program_number
  builder.writeBits(0b11, 2); // reserved
  builder.writeBits(0, 5); // version_number
  builder.writeBits(0b1, 1); // current_next_indicator
  builder.writeBits(0, 8); // section_number
  builder.writeBits(0, 8); // last_section_number
  builder.writeBits(0b111, 3); // reserved
  builder.writeBits(pmt.pcr_pid, 13); // PCR_PID
  builder.writeBits(0b1111, 4); // reserved
  builder.writeBits(pmt.program_info?.byteLength ?? 0, 12); // program_info_length
  builder.writeBuffer(pmt.program_info ?? Buffer.from([])); // program_info
  for (const { stream_type, elementary_PID, ES_info } of pmt.streams) {
    builder.writeBits(stream_type, 8); // stream_type
    builder.writeBits(0b111, 3); // reserved
    builder.writeBits(elementary_PID, 13); // elementary_PID
    builder.writeBits(0b1111, 4); // reserved
    builder.writeBits(ES_info?.byteLength ?? 0, 12); // ES_info_length
    builder.writeBuffer(ES_info ?? Buffer.from([])); // ES_info
  }
  builder.writeBits(0, 32); // CRC32
  const section = builder.build();
  section.writeUInt16BE(section.readUInt16BE(1) | ((section.byteLength - 3) & 0x0FFF), 1);
  section.writeUInt32BE(crc32(section.subarray(0, -4)), section.byteLength - 4);

  return section;
};

const write_pes = (data: Buffer, stream_id: number, pts: number | null, dts: number | null, omit_length = false) => {
  if (pts == null) { dts = null; }

  const builder = new BitBuilder();
  const PES_header_data_length = (pts != null ? 5 : 0) + (dts != null ? 5 : 0);

  builder.writeBits(1, 24); // start_code_prefix
  builder.writeBits(stream_id, 8); // stream_id
  builder.writeBits(0, 16); // pes_length
  if (pes_has_flags(stream_id)) {
    builder.writeBits(0b10000000, 8);
    builder.writeBits(pts != null ? 0b1 : 0b0, 1); // pts present
    builder.writeBits(dts != null ? 0b1 : 0b0, 1); // dts present
    builder.writeBits(0b000000, 6);
    builder.writeBits(PES_header_data_length, 8); // PES_header_data_length
    if (pts != null) {
      const pts_binary = Buffer.alloc(5)
      pts_binary.writeUIntBE(pts, 0, 5);
      const reader = new BitReader(pts_binary);
      reader.skipBits(7);

      builder.writeBits(dts != null ? 0b0011 : 0b0010, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
    if (dts != null) {
      const dts_binary = Buffer.alloc(5)
      dts_binary.writeUIntBE(dts, 0, 5);
      const reader = new BitReader(dts_binary);
      reader.skipBits(7);

      builder.writeBits(0b0001, 4);
      builder.writeBits(reader.readBits(3), 3);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
      builder.writeBits(reader.readBits(15), 15);
      builder.writeBits(0b1, 1); // marker
    }
  }
  const header = builder.build();
  const pes = Buffer.concat([header, data]);
  if (!omit_length) { pes.writeUInt16BE(pes.byteLength - 6, 4); }
  return pes
}

class SectionPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(section: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < section.byteLength; i += (PAYLOAD_SIZE - (i === 0 ? 1 : 0))) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + (PAYLOAD_SIZE - (i === 0 ? 1 : 0)), section.byteLength) - i;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continiuty_counter
      if (i === 0) { builder.writeBits(0, 8); } // pointer_field

      const header = builder.build();
      header.copy(packet, 0);
      section.copy(packet, header.byteLength, i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

class PESPacketizer {
  private continuity_counter: number = 0;
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pes: Buffer): Buffer[] {
    const packets: Buffer[] = [];
    for (let i = 0; i < pes.byteLength; i += PAYLOAD_SIZE) {
      const packet = Buffer.alloc(PACKET_SIZE, 0xFF);
      const length = Math.min(i + PAYLOAD_SIZE, pes.byteLength) - i;
      const filler = PAYLOAD_SIZE - length;

      const builder = new BitBuilder();
      builder.writeBits(SYNC_BYTE, 8);
      builder.writeBits(0, 1); // transport_err_indicator
      builder.writeBits(i === 0 ? 1 : 0, 1); // payload_unit_start_indicator
      builder.writeBits(0, 1); // transport_priority
      builder.writeBits(this.pid, 13);
      builder.writeBits(0, 2); // transport_scrambling_control
      builder.writeBits(filler > 0 ? 1 : 0, 1); // adaptation_field_control (adaptation_field present)
      builder.writeBits(1, 1); // adaptation_field_control (payload present)
      builder.writeBits(this.continuity_counter, 4); // continuity_counter
      if (filler > 0) { builder.writeBits(filler - 1, 8); }
      if (filler > 1) { builder.writeBits(0, 8); }

      const header = builder.build();
      header.copy(packet, 0);
      pes.copy(packet, header.byteLength + Math.max(filler - 2, 0), i, i + length);

      this.continuity_counter = (this.continuity_counter + 1) & 0x0F;
      packets.push(packet);
    }
    return packets;
  }
}

class PCRPacketizer {
  private pid: number;

  public constructor(pid: number) {
    this.pid = pid;
  }

  public packetize(pcr: number): Buffer {
    const packet = Buffer.alloc(PACKET_SIZE, STUFFING_BYTE);

    const builder = new BitBuilder();
    builder.writeBits(SYNC_BYTE, 8);
    builder.writeBits(0, 1); // transport_err_indicator
    builder.writeBits(0, 1); // payload_unit_start_indicator
    builder.writeBits(0, 1); // transport_priority
    builder.writeBits(this.pid, 13);
    builder.writeBits(0, 2); // transport_scrambling_control
    builder.writeBits(1, 1); // adaptation_field_control (adaptation_field_present)
    builder.writeBits(0, 1); // adaptation_field_control (payload present)
    builder.writeBits(0, 4); // continuity_counter
    builder.writeBits(PACKET_SIZE - HEADER_SIZE - 1, 8); // adaptation field length
    builder.writeBits(0x10, 8); // PCR is Present
    builder.writeBits(pcr, 33); // PCR base
    builder.writeBits(0, 6); // reserved
    builder.writeBits(0, 9); // PCR extension

    builder.build().copy(packet);
    return packet;
  }
}

const UTF8Decoder = new TextDecoder('utf-8');
const scriptend: unique symbol = Symbol();

const read_string = (reader: ByteReader): string => {
  const length = reader.readU16BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_longstring = (reader: ByteReader): string => {
  const length = reader.readU32BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_object = (reader: ByteReader): Record<string, any> => {
  const object: Record<string, any> = {};
  while (true) {
    const name = read_string(reader);
    const val = read_value(reader);
    if (val === scriptend) { return object; }
    object[name] = val;
  }
}

const read_mixedarray = (reader: ByteReader): Record<string, any>  => {
  reader.readU32BE(); // length
  return read_object(reader);
}

const read_strictarray = (reader: ByteReader): any[] => {
  const length = reader.readU32BE();
  const array = [];
  for (let i = 0; i < length; i++) {
    array.push(read_value(reader));
  }
  return array;
}

const read_date = (reader: ByteReader): Date => {
  const timestamp = reader.readF64BE();
  const localtimeoffset = reader.readI16BE();
  return new Date(timestamp);
}

const read_value = (reader: ByteReader): any => {
  const tag = reader.readU8();
  switch (tag) {
    case 0: return reader.readF64BE();
    case 1: return reader.readU8() !== 0;
    case 2: return read_string(reader);
    case 3: return read_object(reader);
    case 4: throw new Error('Unsupported Tag: 4 (movie clip)');
    case 5: return null;
    case 6: return undefined;
    case 7: throw new Error('Unsupported Tag: 7 (reference)');
    case 8: return read_mixedarray(reader);
    case 9: return scriptend;
    case 10: return read_strictarray(reader);
    case 11: return read_date(reader);
    case 12: return read_longstring(reader);
  }
  throw new Error(`Invalid tag: ${tag}`);
}

const read_amf0 = (data: Buffer): any[] => {
  const reader = new ByteReader(data);
  const result = [];
  while (!reader.isEOF()) {
    result.push(read_value(reader));
  }
  return result;
}

const UTF8Encoder = new TextEncoder();

const write_number = (data: number): Buffer => {
  const buffer = Buffer.alloc(8)
  buffer.writeDoubleBE(data, 0);
  return buffer;
}

const write_string = (data: string): Buffer => {
  const utf8 = Buffer.from(UTF8Encoder.encode(data));
  const length = Buffer.alloc(2);
  length.writeUInt16BE(utf8.byteLength);
  return Buffer.concat([length, utf8]);
}

const write_object = (obj: Record<string, any>): Buffer => {
  return Buffer.concat([... Object.entries(obj).map(([k, v]) => Buffer.concat([write_string(k), write_value(v)])), Buffer.from([0x00, 0x00, 0x09])]);
}

const write_array = (data: any[]): Buffer => {
  const length = Buffer.alloc(4);
  length.writeUInt32BE(data.length);
  return Buffer.concat([length, ... data.map((datum) => write_value(datum))]);
}

const write_date = (data: Date): Buffer => {
  const buffer = Buffer.alloc(10);
  buffer.writeDoubleBE(data.getTime(), 0);
  buffer.writeUInt16BE(0, 8);
  return buffer;
}

const write_value = (data: any): Buffer => {
  if (data === null) { return Buffer.from([0x05]); }
  if (data === undefined) { return Buffer.from([0x06]); }
  if (Array.isArray(data)) { return Buffer.concat([Buffer.from([0x0a]), write_array(data)]); }
  if (data instanceof Date) { return Buffer.concat([Buffer.from([0x0b]), write_date(data)]); }
  switch (typeof data) {
    case 'number': return Buffer.concat([Buffer.from([0x00]), write_number(data)]);
    case 'boolean': return Buffer.from([0x01, data ? 1 : 0]);
    case 'string': return  Buffer.concat([Buffer.from([0x02]), write_string(data)]);
    case 'object': return Buffer.concat([Buffer.from([0x03]), write_object(data)]);
    default: return Buffer.from([]);
  }
}

const write_amf0 = (... data: any[]): Buffer => {
  return Buffer.concat(data.map((datum) => write_value(datum)));
}

type MessageInformation = {
  message_type_id: number;
  message_stream_id: number
  message_length: number;
  timestamp: number;
  timestamp_delta: number | null;
  is_extended_timestamp: boolean;
}

const MessageType = {
  SetChunkSize: 1,
  Abort: 2,
  Acknowledgement: 3,
  UserControl: 4,
  WindowAcknowledgementSize: 5,
  SetPeerBandwidth: 6,
  Audio: 8,
  Video: 9,
  DataAMF3: 15,
  CommandAMF3: 17,
  DataAMF0: 18,
  CommandAMF0: 20,
} as const;

type Message = Omit<MessageInformation, 'timestamp_delta' | 'is_extended_timestamp'> & {
  data: Buffer;
}

async function* read_message(reader: AsyncByteReader): AsyncIterable<Message> {
  let chunk_maximum_size = 128; // システムメッセージにより変化する
  const informations = new Map<number, MessageInformation>();
  const chunks = new Map<number, Buffer[]>();
  const lengths = new Map<number, number>();

  while (true) {
    const basic = await reader.readU8();
    const fmt = (basic & 0b11000000) >> 6;
    let cs_id = (basic & 0b00111111) >> 0;
    switch (cs_id) {
      case 0: cs_id = 64 + await reader.readU8(); break;
      case 1: cs_id = 64 + await reader.readU16LE(); break;
    }
    if (fmt !== 3) { chunks.set(cs_id, []); lengths.set(cs_id, 0); }
    const chunk_buffers = chunks.get(cs_id) ?? [];
    const chunk_length = lengths.get(cs_id) ?? 0;
    let information = informations.get(cs_id);

    let timestamp = fmt !== 3 ? await reader.readU24BE() : information?.timestamp;
    const message_length = fmt === 0 || fmt === 1 ? await reader.readU24BE() : information?.message_length;
    const message_type_id = fmt === 0 || fmt === 1 ? await reader.readU8() : information?.message_type_id;
    const message_stream_id = fmt === 0 ? await reader.readU32LE() : information?.message_stream_id;
    // 情報がない時は何もできないしパースもできない。今回は簡単のため受信をやめて return する
    const is_extended_timestamp = fmt !== 3 ? timestamp === 0xFFFFFF : information?.is_extended_timestamp;
    if (timestamp == null || message_length == null || message_type_id == null || message_stream_id == null || is_extended_timestamp == null) { return; }
    const extended_timestamp = is_extended_timestamp ? await reader.readU32BE() : null;

    let timestamp_delta = null;
    if (fmt === 1 || fmt === 2) {
      if (information?.timestamp == null) { return; }
      timestamp_delta = extended_timestamp ?? timestamp;
      timestamp = information.timestamp + timestamp_delta;
    } else if (fmt === 3) {
      timestamp_delta = extended_timestamp ?? information?.timestamp_delta ?? timestamp;
      timestamp += (chunk_length > 0) ? 0 : timestamp_delta;
    } else {
      timestamp = extended_timestamp ?? timestamp;
    }

    information = { message_type_id, message_stream_id, message_length, timestamp, timestamp_delta, is_extended_timestamp } satisfies MessageInformation;
    const chunk = await reader.read(Math.min(message_length - chunk_length, chunk_maximum_size));
    const length = chunk_length + chunk.byteLength;
    if (length >= message_length) {
      if (length === message_length) {
        const data = Buffer.concat([... chunk_buffers, chunk])
        if (message_type_id === MessageType.SetChunkSize) {
          chunk_maximum_size = Math.max(1, Math.min(2 ** 31 - 1, data.readUInt32BE(0)));
        } else {
          const { timestamp_delta, is_extended_timestamp, ... message } = information;
          yield { ... message, data };
        }
      }
      chunks.set(cs_id, []);
      lengths.set(cs_id, 0);
    } else {
      chunks.set(cs_id, [... chunk_buffers, chunk]);
      lengths.set(cs_id, length);
    }
    informations.set(cs_id, information);
  }
}

const write_message = (message: Omit<Message, 'message_length'>): Buffer => {
  let chunk_maximum_size = 128;
  const result: Buffer[] = [];

  for (let i = 0; i < message.data.byteLength; i += chunk_maximum_size) {
    const chunk = message.data.subarray(i, Math.min(message.data.byteLength, i + chunk_maximum_size));
    const cs_id = 3; // こちらからはシステム系のメッセージしか送らないので 3 に固定する
    const fmt = i === 0 ? 0 : 3; // 簡単化のため 0 か 3 以外は使わない
    const basic = Buffer.from([(fmt << 6) | cs_id]);

    if (fmt === 3) {
      result.push(basic, chunk);
      continue;
    }

    const header = Buffer.alloc(message.timestamp >= 0xFFFFFF ? 15 : 11);
    header.writeUIntBE(Math.min(message.timestamp, 0xFFFFFF), 0, 3);
    header.writeUIntBE(message.data.byteLength, 3, 3);
    header.writeUIntBE(message.message_type_id, 6, 1);
    header.writeUIntLE(message.message_stream_id, 7, 4);
    if (message.timestamp >= 0xFFFFFF) {
      header.writeUIntBE(message.timestamp, 11, 4);
    }

    result.push(basic, header, chunk);
  }

  return Buffer.concat(result);
}

class BandwidthQuotaExceededError extends Error {
  constructor(message: string, option?: ErrorOptions) {
    super(message, option);
    this.name = this.constructor.name;
  }
}

type BandwidthEstimatorOption = {
  intervalMills: number;
  movingAverageLength: number;
}
const BandwidthEstimatorOption = {
  from(option?: Partial<BandwidthEstimatorOption>): BandwidthEstimatorOption {
    return { intervalMills: 100, movingAverageLength: 10, ... option };
  }
}

class BandwidthEstimator {
  private option: BandwidthEstimatorOption;
  private controller: AbortController;
  private limit: number;
  private timerId: NodeJS.Timeout | null = null;
  private readonly measureHandler = this.measure.bind(this);
  private estimates: number[] = [];
  private movingAverageLength: number;
  private totalBytes: number = 0;
  private previousTime: DOMHighResTimeStamp = performance.now();

  public constructor(limit: number, controller: AbortController, option?: Partial<BandwidthEstimatorOption>) {
    this.limit = limit;
    this.controller = controller;
    this.option = BandwidthEstimatorOption.from(option);
    this.movingAverageLength = this.option.movingAverageLength;
    this.timerId = setInterval(this.measureHandler, this.option.intervalMills);
  }

  public feed(byteLenght: number) {
    this.totalBytes += byteLenght;
  }

  public estimate(): number {
    return this.estimates.reduce((sum, curr) => sum + curr, 0) / this.movingAverageLength;
  }

  public measure(): void {
    const currentTime = performance.now();
    const duration = (currentTime - this.previousTime) / 1000;
    const bandwidth = (this.totalBytes * 8) / duration;
    this.estimates.push(bandwidth);

    if (this.estimates.length >= this.movingAverageLength) {
      this.estimates.splice(0, this.estimates.length - this.movingAverageLength);
      const average = this.estimate();
      if (average >= this.limit) {
        this.controller.abort(new BandwidthQuotaExceededError(`Bandwidth quota exceeded! limit: ${this.limit}, actual: ${Math.ceil(average)}`));
      }
    }

    this.previousTime = currentTime;
    this.totalBytes = 0;
  }

  public destroy(): void {
    if (this.timerId == null) { return; }
    clearInterval(this.timerId);
    this.timerId = null;
  }
}

const options = {
  rtmp: {
    type: 'string',
    default: '1935',
  },
  web: {
    type: 'string',
    default: '8000',
  },
  app: {
    type: 'string'
  },
  streamKey: {
    type: 'string'
  },
  highWaterMark: {
    type: 'string'
  },
  bandwidth: {
    type: 'string'
  }
} as const satisfies ParseArgsOptionsConfig;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.rtmp, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (Number.isNaN(Number.parseInt(args.web, 10))) {
  console.error('Please Specify valid port number'); process.exit(1);
}
if (args.app == null) {
  console.error('Please Specify valid app'); process.exit(1);
}
if (args.streamKey == null) {
  console.error('Please Specify valid streamKey'); process.exit(1);
}
if (args.highWaterMark != null && Number.isNaN(Number.parseInt(args.highWaterMark, 10))) {
  console.error('Please Specify valid highwatermark'); process.exit(1);
}
if (args.bandwidth != null && Number.isNaN(Number.parseInt(args.bandwidth, 10))) {
  console.error('Please Specify valid bandwidth'); process.exit(1);
}
const port = Number.parseInt(args.rtmp, 10);
const web = Number.parseInt(args.web, 10);
const app = args.app;
const streamKey = args.streamKey;
const highWaterMark = args.highWaterMark != null ? Number.parseInt(args.highWaterMark, 10) : undefined;
const bandwidth = args.bandwidth != null ? Number.parseInt(args.bandwidth, 10) : undefined;

const handle_handshake = async (reader: AsyncByteReader, connection: Duplex): Promise<boolean> => {
  // C0/S0
  await reader.readU8(); // Read C0
  connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
  // C1/S1
  const c1 = await reader.read(1536); // read C1
  const c1_random = c1.subarray(8);
  const s1_random = randomBytes(1536 - 8);
  const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
  connection.write(s1); // write C1
  // C2/S2
  const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
  connection.write(s2); // write S2
  const c2 = await reader.read(1536); // read C2
  const c2_random_echo = c2.subarray(8);
  // Check Random Echo
  return s1_random.equals(c2_random_echo);
}

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
  DISCONNECTED: 'DISCONNECTED',
} as const;
type Option = {
  id: Symbol;
  app: string;
  streamKey: string;
};

const MESSAGE_STREAM = 1;
let lock: Symbol | null = null;
const need_yield = (state: (typeof STATE)[keyof typeof STATE], message: Message): boolean => {
  if (state !== STATE.PUBLISHED) { return false; }
  if (message.message_stream_id !== MESSAGE_STREAM) { return false; }

  switch (message.message_type_id) {
    case MessageType.Audio: return true;
    case MessageType.Video: return true;
    case MessageType.DataAMF0: return true;
    default: return false;
  }
};
const TRANSITION = {
  [STATE.WAITING_CONNECT]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.WAITING_CONNECT; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_CONNECT; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'connect') { return STATE.WAITING_CONNECT; }
    const transaction_id = command[1];
    const appName = command[2]['app'];
    const connectAccepted = appName === option.app;

    const info = connectAccepted ? {
      code: 'NetConnection.Connect.Success',
      description: 'Connection succeeded.',
      data: { version: '3,5,7,7009', },
      objectEncoding: 0, // 0 = AMF0, 3 = AMF3
      level: 'status', // 正常系
    } : {
      code: 'NetConnection.Connect.Rejected',
      description: 'Connection rejected.',
      level: 'error', // 異常系
    };

    const result = write_amf0(
      '_result',
      transaction_id,
      {
        fmsVer: 'FMS/3,5,7,7009',
        capabilities: 31,
        mode: 1,
      },
      info
    );
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: 0,
      timestamp: 0,
      data: result,
    }));

    if (!connectAccepted) { return STATE.DISCONNECTED; }
    return STATE.WAITING_CREATESTREAM;
  },
  [STATE.WAITING_CREATESTREAM]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.WAITING_CREATESTREAM; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_CREATESTREAM; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'createStream') { return STATE.WAITING_CREATESTREAM; }
    const transaction_id = command[1];

      // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
    const result = write_amf0('_result', transaction_id, null, MESSAGE_STREAM);
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: 0,
      timestamp: 0,
      data: result,
    }));

    return STATE.WAITING_PUBLISH;
  },
  [STATE.WAITING_PUBLISH]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== MESSAGE_STREAM) { return STATE.WAITING_PUBLISH; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.WAITING_PUBLISH; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'publish') { return STATE.WAITING_PUBLISH; }
    const transaction_id = command[1];
    const streamKey = command[3];
    const publishAccepted = streamKey === option.streamKey && lock == null; // streamKey が合致していて、配信されてない場合は配信を許可する

    const info = publishAccepted ? {
      code: 'NetStream.Publish.Start',
      description: 'Publish Accepted',
      level: 'status' // 正常系
    } : {
      code: 'NetStream.Publish.Failed', // Permision Denied
      description: 'Publish Failed',
      level: 'error' // 異常系
    };

    const result = write_amf0('onStatus', transaction_id, null, info);
    connection.write(write_message({
      message_type_id: MessageType.CommandAMF0,
      message_stream_id: message.message_stream_id,
      timestamp: 0,
      data: result,
    }));

    if (!publishAccepted) { return STATE.DISCONNECTED; }
    lock = option.id;
    return STATE.PUBLISHED;
  },
  [STATE.PUBLISHED]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    if (message.message_stream_id !== 0) { return STATE.PUBLISHED; }
    if (message.message_type_id !== MessageType.CommandAMF0) { return STATE.PUBLISHED; }
    const command = read_amf0(message.data);

    const name = command[0];
    if (name !== 'deleteStream') { return STATE.PUBLISHED; }
    const stream = command[3];
    if (stream !== MESSAGE_STREAM) { return STATE.PUBLISHED; }

    return STATE.DISCONNECTED;
  },
  [STATE.DISCONNECTED]: (message: Message, connection: Duplex, option: Option): (typeof STATE)[keyof typeof STATE] => {
    return STATE.DISCONNECTED;
  }
} as const satisfies Record<(typeof STATE)[keyof typeof STATE], (message: Message, connection: Duplex, option: Option) => (typeof STATE)[keyof typeof STATE]>;

class DisconnectError extends Error {
  constructor(message: string, option?: ErrorOptions) {
    super(message, option);
    this.name = this.constructor.name;
  }
}

async function* handle_rtmp(connection: Duplex, app: string, key: string, limit?: number): AsyncIterable<Message> {
  const option = {
    id: Symbol(),
    app: app,
    streamKey: key,
  } satisfies Option;
  const controller = new AbortController();
  const reader = new AsyncByteReader({ signal: controller.signal });
  const estimator = new BandwidthEstimator(limit ?? Number.POSITIVE_INFINITY, controller);
  connection.pipe(new Writable({
    write(data, _, cb) { reader.feed(data); estimator.feed(data.byteLength); cb(); },
  }));
  const disconnected = () => { controller.abort(new DisconnectError('Disconnected!')); }
  connection.addListener('close', disconnected);

  try {
    /*
    * RTMPのハンドシェイクを処理する
    */
    if (!await handle_handshake(reader, connection)) { return; }
    /*
    * RTMPのメッセージを処理する
    */
    let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
    for await (const message of read_message(reader)) {
      // 共通で処理するメッセージはここで処理する

      // 上位に伝える映像/音声/データのメッセージだったら伝える
      if (need_yield(state, message)) { yield message; }

      // 個別のメッセージによる状態遷移
      state = TRANSITION[state](message, connection, option);
      if (state === STATE.DISCONNECTED) { return; }
    }
  } catch (e) {
    throw e;
  } finally {
    connection.removeListener('close', disconnected);
    if (lock === option.id) { lock = null; }
    connection.destroy();
    estimator.destroy();
    reader.feedEOF();
  }
}

type VideoMessage = Message & {
  message_type_id: (typeof MessageType.Video);
};
type AudioMessage = Message & {
  message_type_id: (typeof MessageType.Audio);
}
type AMF0DataMessage = Message & {
  message_type_id: (typeof MessageType.DataAMF0);
}

const VideoCodecType = {
  AVC: 7,
} as const;

const AudioCodecType = {
  AAC: 10,
} as const;

type AVCData = {
  codec: 'AVC',
} & ({
  packetType: 0,
  avcDecderCongiurationRecord: Buffer;
} | {
  packetType: 1,
  compositionTimeOffset: number;
  data: Buffer;
});
type VideoData = AVCData & {
  timestamp: number;
  type: number;
  kind: 'Video';
};

type AACData = {
  codec: 'AAC',
} & ({
  packetType: 0;
  audioSpecificConfig: Buffer;
} | {
  packetType: 1;
  data: Buffer;
});
type AudioData = AACData & {
  timestamp: number;
  kind: 'Audio';
};

type ActionScriptData = {
  timestamp: number;
  kind: 'Data',
  values: any[];
}

const handle_avc = (reader: ByteReader): AVCData | null => {
  const packetType = reader.readU8();
  const compositionTimeOffset = reader.readI24BE();

  switch (packetType) {
    case 0: return {
      codec: 'AVC',
      packetType,
      avcDecderCongiurationRecord: reader.read(-1),
    };
    case 1: return {
      codec: 'AVC',
      packetType,
      compositionTimeOffset,
      data: reader.read(-1),
    }
    default:
      return null;
  }
}

const handle_video = (message: VideoMessage): VideoData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const type = (meta & 0xF0) >> 4;
  const codec = (meta & 0x0F) >> 0;

  switch (codec) {
    case VideoCodecType.AVC: {
      const avc = handle_avc(reader);
      if (avc == null) { return null; }
      return { kind: 'Video', type, timestamp: message.timestamp, ... avc };
    }
    default:
      return null;
  }
}

const handle_aac = (reader: ByteReader): AACData | null => {
  const packetType = reader.readU8();
  switch (packetType) {
    case 0: return {
      codec: 'AAC',
      packetType,
      audioSpecificConfig: reader.read(-1)
    };
    case 1: return {
      codec: 'AAC',
      packetType,
      data: reader.read(-1),
    }
    default:
      return null;
  }
}

const handle_audio = (message: AudioMessage): AudioData | null => {
  const reader = new ByteReader(message.data);

  const meta = reader.readU8();
  const codec = (meta & 0xF0) >> 4;
  const rate = (meta & 0x0C) >> 2;
  const size = (meta & 0x02) >> 1;
  const type = (meta & 0x01) >> 0;

  switch (codec) {
    case AudioCodecType.AAC: {
      // In AAC: rate, size, type are derived in AudioSpecificConfig
      const aac = handle_aac(reader);
      if (aac == null) { return null; }
      return { kind: 'Audio', timestamp: message.timestamp, ... aac };
    }
    default:
      return null;
  }
}

const handle_amf0_data = (message: AMF0DataMessage): ActionScriptData | null => {
  const values = read_amf0(message.data);
  return { kind: 'Data', timestamp: message.timestamp, values };
}

const handle_rtmp_payload = (message: Message): VideoData | AudioData | ActionScriptData | null => {
  switch (message.message_type_id) {
    case MessageType.Video: return handle_video(message as VideoMessage);
    case MessageType.Audio: return handle_audio(message as AudioMessage);
    case MessageType.DataAMF0: return handle_amf0_data(message as AMF0DataMessage);
    default: return null;
  }
}

const PMT_PID = 256;
const PCR_PID = 257;
const AVC_PID = 258;
const AAC_PID = 259;
const PAT_DATA = {
  transport_stream_id: 0,
  programs: [{
    program_number: 1,
    program_map_PID: PMT_PID
  }]
} as const satisfies PAT;
const PMT_DATA = {
  program_number: 1,
  pcr_pid: PCR_PID,
  streams: [{
    stream_type: 0x1b,
    elementary_PID: AVC_PID,
  }, {
    stream_type: 0x0F,
    elementary_PID: AAC_PID,
  }]
} as const satisfies PMT;
const emit_PSI_interval = 100;

const timestamp_from_rtmp_to_mpegts = (timestamp: number): number => {
  return timestamp * 90;
}

class MPEGTSTransmuxer {
  private avcDecoderConfigurationRecord: AVCDecoderConfigurationRecord | null = null;
  private audioSpecificConfig: AudioSpecificConfig | null = null;
  private patPacketizer = new SectionPacketizer(0);
  private pmtPacketizer = new SectionPacketizer(PMT_PID);
  private pcrPacketizer = new PCRPacketizer(PCR_PID);
  private avcPacketizer = new PESPacketizer(AVC_PID);
  private aacPacketizer = new PESPacketizer(AAC_PID);
  private last_emit_PSI_timestamp: number | null = null;

  public *feed(message: Message): Iterable<Buffer> {
    if (this.last_emit_PSI_timestamp == null || (message.timestamp - this.last_emit_PSI_timestamp) >= emit_PSI_interval) {
      const packets = [
        ... this.patPacketizer.packetize(write_pat(PAT_DATA)),
        ... this.pmtPacketizer.packetize(write_pmt(PMT_DATA)),
        this.pcrPacketizer.packetize(timestamp_from_rtmp_to_mpegts(message.timestamp)),
      ];

      yield* packets;
      this.last_emit_PSI_timestamp = message.timestamp;
    }

    const payload = handle_rtmp_payload(message);
    if (payload == null) { return; }

    switch (payload.kind) {
      case 'Video':
        if (payload.codec !== 'AVC') { return; }
        if (payload.packetType === 0) {
          this.avcDecoderConfigurationRecord = read_avc_decoder_configuration_record(payload.avcDecderCongiurationRecord);
          return;
        }
        break;
      case 'Audio':
        if (payload.codec !== 'AAC') { return; }
        if (payload.packetType === 0) {
          this.audioSpecificConfig = read_audio_specific_config(payload.audioSpecificConfig);
          return;
        }
        break;
      default:
        return;
    }

    switch (payload.kind) {
      case 'Video':
        if (this.avcDecoderConfigurationRecord == null) { return; }
        yield* this.avcPacketizer.packetize(
          write_pes(
            write_annexb_avc(payload.data, this.avcDecoderConfigurationRecord),
            0xe0,
            timestamp_from_rtmp_to_mpegts(payload.timestamp + payload.compositionTimeOffset),
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            true
          )
        );
        break;
      case 'Audio':
        if (this.audioSpecificConfig == null) { return; }
        yield* this.aacPacketizer.packetize(
          write_pes(
            write_adts_aac(payload.data, this.audioSpecificConfig),
            0xc0,
            timestamp_from_rtmp_to_mpegts(payload.timestamp),
            null,
            false
          )
        );
        break;
    }
  }
}

type StreamingInformation = [writeFn: (buffer: Buffer) => void, exitFn: () => void];
const streaming = new Map<number, StreamingInformation>();
const handle = async (connection: Duplex) => {
  const rtmp_to_mpegts = new MPEGTSTransmuxer();
  try {
    for await (const message of handle_rtmp(connection, app, streamKey, bandwidth)) {
      for (const packet of rtmp_to_mpegts.feed(message)) {
        for (const [write, _] of streaming.values()) { write(packet); }
      }
    }
  } catch (e) {
    console.error(e);
  } finally {
    for (const [_, end] of streaming.values()) { end(); }
  }
};

const rtmp_server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle(connection);
});
rtmp_server.listen(port);

let viewers = 0;
const web_server = http.createServer({ highWaterMark }, (req, res) => {
  if (req.url == null) {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
    return;
  }
  const url = new URL(req.url, `http://localhost:${web}`);
  if (!(req.method === 'GET' && url.pathname === `/${app}/${streamKey}`)) {
    res.writeHead(404, { 'access-control-allow-origin': '*' });
    res.end();
    return;
  }
  const viewer = viewers++;
  res.writeHead(200, {
    'content-type': 'video/mp2t',
    'access-control-allow-origin': '*',
  });

  const write = (chunk: Buffer) => {
    if (res.closed) { return; }
    if (!res.write(chunk) && highWaterMark != null) { res.destroy(); }
  };

  const exit = () => { res.end(); streaming.delete(viewer); };
  const entry = [write, exit] satisfies StreamingInformation;
  streaming.set(viewer, entry);

  req.on('close', exit);
  res.on('close', exit);
  req.on('error', exit);
  res.on('error', exit);
});
web_server.listen({ port: web });