PLAY DEVELOPERS BLOG

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

HuluやTVerなどの日本最大級の動画配信を支える株式会社PLAYが運営するテックブログです。

動画配信技術を実装して理解する〜RTMP 1.0 受信編〜

こんにちは、2025年度から技術推進室に異動しました、石川です。以前はフルスタックエンジニアとして動画配信における字幕に関わっていました。システムプログラミングから様々なプレイヤーの字幕表示まで、字幕に関わる問題をワンストップで一貫して対応するエンジニアとして活動していました。

今回は、趣味の一環として動画配信の技術を自分で実装して遊んでいるので "動画配信技術を実装して理解する"と題して、様々な動画配信技術を実装する観点から紹介する記事を書いてみたいと思います。ちなみに、不定期連載になる予定です。

今回は、まだまだ打ち上げプロトコルとして現役なRTMPの受信部分を実際に実装して理解してみましょう。

RTMPとは?

そもそもRTMPとはなんなのでしょうか?

RTMPはAdobeが開発したリアルタイムメッセージング用の通信プロトコルです。ただ、紆余曲折があり現在はVeoveraが仕様を管理しています。Flashが廃れたことでAdobeはRTMPにも積極的な関与がない状態になりましたが、RTMPは打ち上げにおいて現役で需要があるためAdobeを含めた団体で管理するようにしたようです。

RTMPはTCPを使ったプロトコルなので安定感が高く、パケットの欠落が発生しないといった長所から、今でも配信の打ち上げにおいて現役のプロトコルです。やはり多少のリアルタイム性よりパケットロスの方が視聴影響が大きい場合が多く、詰まっても全部送ってくれる方が使いやすいという点もあります。

また、"Enhanced RTMP"と呼ばれるRTMPの拡張仕様をVeoveraが策定しており、過去のRTMP1.0の仕様もGitHub上で公開しています。Adobeが仕様の公開をやめてしまいましたが、現在ではVeoveraが代わりに提供してくれるので助かりますね。"Enhanced RTMP"の記事はまた別途書こうと思いますが、V1で様々なコーデックのサポート、V2でマルチトラックができるようになるなど、まだまだ進化を続けています。

RTMPの基本要素

まず、RTMPの基本的な要素であるハンドシェイクや、送信単位として存在するストリーム、メッセージ、チャンクという概念を説明します。また、データをシリアライズして送るAMFという仕様もあるので、一緒に紹介します。

ハンドシェイク (Handshake)

RTMPはTCP通信ですが、他のTCP上のメッセージングと区別するためにRTMP自体でもハンドシェイクを行います。

お互いにプロトコル番号とランダムなデータを交換し合い、突合することでRTMPの接続を確立します。いわゆるイクサの前のアイサツみたいなものです。

ストリーム (Stream)

RTMPではストリームという単位で論理的な通信路を定義します。

ストリームとメッセージとチャンクの関係

複数の通信路を使うことで複数のデータや動画像を同時に多重化して送れます。 0番目のストリームがデフォルトのシステム用ストリームであり、他のストリームはRTMP上でサーバとクライアントでの交渉の後に利用します。

メッセージ (Message)

RTMPでは、制御情報、動画などはメッセージという形でストリーム上で伝達されます。

メッセージにはメッセージの種別、データの他に、タイムスタンプが付与されています。全てのメッセージにタイムスタンプがあるため、動画のタイムスタンプもRTMP上のタイムスタンプで管理されています。

これには困った点もあり、RTMPは1ms単位なので動画のタイムスタンプを正確に表せないという問題があります。例えば、映像や音声は1フレームが1ms単位だと割り切れません。30fpsの場合33.33...msとなりますが、RTMP上では33ms,33ms,34msというように誤差がある形で送ります。なので、本当の時間に対して分解能の問題で誤差がある形で送らざる得なく、誤差の分を後から訂正していくように送ります。

RTMP上では動画は連続しているので継ぎ足すようにすれば問題ないといえばないのですが...。動画のタイムスタンプを正確に知りたい場合には、動画のコーデック内の情報から時間を積算する必要がある点には注意が必要です。

チャンク (Chunk)

RTMPにおいてメッセージはチャンクという単位で多重化され送られます。

大容量のデータを一気に送るとそのデータによって他のデータの通信が影響をうけ、その間に流せなくなってしまいます。なので、チャンクに分割してインターリーブさせて送るということです。チャンクの最大長はメッセージによって変更でき、動画像などの大容量のものを送る場合にはオーバーヘッドを少なくするべく最大長を増やして送るのが主流です。

AMF (Action Message Format)

RTMPでは、RPCや動画のメタデータにAMF(Action Message Format)を利用します。

これは要するに、RTMPで使うActionScriptのObjectやPrimitiveのシリアライズを定めたものです。AMFにはAMF0とAMF3という2つのバージョンがありますが、配信においては大体AMF0が使われています。

Veoveraとしては、より効率的なAMF3の方を使って欲しいようですが...。

RTMPの実装の流れ

では、実際にRTMPで使う技術を実装してみましょう。実装にはNode.js(v22)とTypeScriptを用います。 個人的にTypeScriptが手グセである程度綺麗にかけて好きなので使っていますが、非同期処理が楽に書ける言語であればなんでも大丈夫でしょう。

ユーティリティ系

まず、実装を簡素化するためにユーティリティを定義します。

非同期読み取り

Node.jsやWebStreamのReadableStreamはバイト数を指定して取れないため、自作のReaderを用意します。Promiseの扱いが面倒ですが、逆にそこさえ気をつければ難しくはありません。

非同期読み取りクラス (クリックで展開)

async-byte-reader.ts

export default class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private promises: [number, (result: Buffer) => void][] = [];

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }
  }

  public push(buffer: Buffer): void {
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public async read(size: number): Promise<Buffer> {
    const { promise, resolve } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve]);
    this.fulfill();
    return promise;
  }

  public async readUIntBE(length: number): Promise<number> {
    return (await this.read(length)).readUIntBE(0, length);
  }

  public async readUIntLE(length: number): Promise<number> {
    return (await this.read(length)).readUIntLE(0, length);
  }

  public readU8(): Promise<number> {
    return this.readUIntBE(1);
  }

  public readU16BE(): Promise<number> {
    return this.readUIntBE(2);
  }

  public readU24BE(): Promise<number> {
    return this.readUIntBE(3);
  }

  public readU32BE(): Promise<number> {
    return this.readUIntBE(4);
  }

  public readU16LE(): Promise<number> {
    return this.readUIntLE(2);
  }

  public readU24LE(): Promise<number> {
    return this.readUIntLE(3);
  }

  public readU32LE(): Promise<number> {
    return this.readUIntLE(4);
  }
}

バイト列読み取りのユーティリティ

上記の非同期読み取りを簡素化した同期版のバイト列の読み取り用Readerも使います。 いちいちオフセットを勘定するコードが入るとコードが宣言的じゃなくなって嫌ですし。

同期読み取りクラス (クリックで展開)

byte-reader.ts

export default class ByteReader {
  private buffer: Buffer;
  private offset = 0;

  public constructor(buffer: Buffer) {
    this.buffer = buffer;
  }

  public isEOF(): boolean {
    return this.offset >= this.buffer.byteLength;
  }

  public read(length: number): Buffer {
    if (length < 0) { length = this.buffer.byteLength - this.offset; }
    const value = this.buffer.subarray(this.offset, this.offset + length);
    this.offset += length;
    return value;
  }

  public readUIntBE(length: number): number {
    return this.read(length).readUIntBE(0, length);
  }

  public readIntBE(length: number): number {
    return this.read(length).readIntBE(0, length);
  }

  public readU8(): number {
    return this.readUIntBE(1);
  }

  public readU16BE(): number {
    return this.readUIntBE(2);
  }

  public readU24BE(): number {
    return this.readUIntBE(3);
  }

  public readU32BE(): number {
    return this.readUIntBE(4);
  }

  public readI8(): number {
    return this.readIntBE(1);
  }

  public readI16BE(): number {
    return this.readIntBE(2);
  }

  public readI24BE(): number {
    return this.readIntBE(3);
  }

  public readI32BE(): number {
    return this.readIntBE(4);
  }

  public readF64BE(): number {
    return this.read(8).readDoubleBE(0);
  }
}

ハンドシェイク (Handshake)

RTMP におけるハンドシェイクは以下の3種類のデータで構成されています。

  • C0/S0: バージョン番号 (1byte) ※ 0x03 固定
  • C1/S1: ローカル時間(4byte) + Reserved(4byte) + Random(1528byte)
  • C2/S2: 相手のローカル時間(4byte) + 現在のローカル時間(4byte) + 相手のRandom(1528byte)

C1/S1,C2/S2の時間(先頭8byte)に関しては、RTMPで映像を打ち上げる場合には0と考えて良いため、ハンドシェイクでは以下の点を確かめます。

  • 接続側: C1で送ったRandomとS2で受信したRandomが一致するか
  • サーバ側: S1で送ったRandomとC2で受信したRandomが一致するか

これらのRandom部分が一致すれば、RTMPのハンドシェイク成功で接続が確立できます。

また、大体の場合においてRTTを減らすべく、接続側がC0とC1は同時に送り、サーバ側は S0, S1, S2 を同時に送ります。その後、接続側がC2を送ります。S2を先に送らないとC2を応答しないクライアントが存在することに注意が必要です。

RTMPのハンドシェイクのシーケンス

実際のハンドシェイクのコードは以下のようになるでしょう。

handshake (これは擬似コードです)

// C0/S0
await reader.readU8(); // Read C0
connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
// C1/S1
const c1 = await reader.read(1536); // read C1
const c1_random = c1.subarray(8);
const s1_random = randomBytes(1536 - 8);
const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
connection.write(s1); // write C1
// C2/S2
const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
connection.write(s2); // write S2
const c2 = await reader.read(1536); // read C2
const c2_random_echo = c2.subarray(8);
// Check Random Echo
if (s1_random.byteLength !== c2_random_echo.byteLength || !s1_random.equals(c2_random_echo)) {
  throw new Error('Invalid Random Echo');
}

メッセージ

RTMPは双方向プロトコルであるため、お互いにメッセージを送り合います。しかも、メッセージはチャンクに分割して多重化するため、受信する際にはチャンクからメッセージを構成する必要があり、送信するためにはメッセージからチャンクに分割する必要があります。

メッセージとチャンクの変換

チャンクのフォーマット

チャンクは "chunk basic header", "chunk message header", "chunk data"の3つで構成されています。chunk basic headerの形は以下の通りです。

chunk basic header の種別と構造

  • fmt: 2bit
    • チャンクのフォーマットを定める値
    • 0 ~ 3 の4つの値を取る
  • cs_id: 6 bit
    • chunk stream id の略で現在送信中のchunkを管理するID
    • この値が0の場合は、実際のcs_idは 64 + 直後の1byte となる
    • この値が1の場合は、実際のcs_idは 64 + 直後の2byte (1byte目 + 2byte目 * 256 で LittleEndian) となる

chunk basic headerのfmtの値によって、後続のchunk message headerの種別が決まります。fmtの値がそのままTypeの値になります。各種別のType0からType3までのchunk message headerの形は以下の通りです。

chunk message header の種別と構造

  • timestamp: 3 byte
    • fmt が 0 の時はタイムスタンプそのもの
    • fmt が 1 か 2 の時は、前回の cs_id の timestamp からの加算値
    • 0xFFFFFF 以上の場合は 0xFFFFFF を送り、後述の extended_timestamp を送信し利用する
  • message_length: 3 byte
    • メッセージのデータ長
    • fmt が 0 か 1 の時に送られる
    • それ以外の時は以前のcs_idの値を利用する
  • message_type_id: 1 byte
    • メッセージの種類
    • fmt が 0 か 1 の時に送られる
    • それ以外の時は以前のcs_idの値を利用する
  • message_stream_id: 4 byte
    • メッセージがどのストリームに所属しているか
    • fmt が 0 の時に送られる
    • それ以外の時は以前のcs_idの値を利用する
  • extended_timestamp: 4 byte
    • 上述した timestamp が 0xFFFFFF の時に送られる
    • これがある場合には、こちらのtimestampを利用する
    • fmt=3でも以前のチャンクのtimestampが0xFFFFFFだった場合に送られる

そしてchunk dataが実際のチャンクで送られるデータとなります。この時も注意すべき点があります。

  • チャンクはチャンクの最大長かメッセージの終了で区切られる
  • 完全に固定長でスタッフィングされているわけではないので注意

チャンクのメッセージ化

fmtによって以前の情報を再利用する場合があるため、実装は結構複雑になります。 特にfmt=3である場合は、受信中のデータに継ぎ足す場合と、以前のcs_idの情報を元にメッセージを新規挿入する場合があるという点が注意です。

例えば、fmt=2のチャンクからメッセージがfmt=3のメッセージなどで完成した直後に、またfmt=3のメッセージが来た場合は、以前のfmt=2の情報を元にメッセージを組み立てます。すなわち、fmt=2で指定したtimestampfmt=3の時でも参照され、タイムスタンプの加算を行なった上で、新規にメッセージを作ります。また、逆にメッセージの途中で来た場合には既存のメッセージに継ぎ足しをする挙動になります。RTMPの仕様書の5.3.2のExampleで説明されている部分ですので、注意深く仕様を読みましょう。

message-reader.ts

import AsyncByteReader from "./async-byte-reader";

type MessageInformation = {
  message_type_id: number;
  message_stream_id: number
  message_length: number;
  timestamp: number;
  timestamp_delta: number | null;
  is_extended_timestamp: boolean;
}

export const MessageType = {
  SetChunkSize: 1,
  Abort: 2,
  Acknowledgement: 3,
  UserControl: 4,
  WindowAcknowledgementSize: 5,
  SetPeerBandwidth: 6,
  Audio: 8,
  Video: 9,
  DataAMF3: 15,
  CommandAMF3: 17,
  DataAMF0: 18,
  CommandAMF0: 20,
} as const;

export type Message = Omit<MessageInformation, 'timestamp_delta' | 'is_extended_timestamp'> & {
  data: Buffer;
}

export default async function* read_message(reader: AsyncByteReader): AsyncIterable<Message> {
  let chunk_maximum_size = 128; // システムメッセージにより変化する
  const informations = new Map<number, MessageInformation>();
  const chunks = new Map<number, Buffer[]>();
  const lengths = new Map<number, number>();

  while (true) {
    const basic = await reader.readU8();
    const fmt = (basic & 0b11000000) >> 6;
    let cs_id = (basic & 0b00111111) >> 0;
    switch (cs_id) {
      case 0: cs_id = 64 + await reader.readU8(); break;
      case 1: cs_id = 64 + await reader.readU16LE(); break;
    }
    if (fmt !== 3) { chunks.set(cs_id, []); lengths.set(cs_id, 0); }
    const chunk_buffers = chunks.get(cs_id) ?? [];
    const chunk_length = lengths.get(cs_id) ?? 0;
    let information = informations.get(cs_id);

    let timestamp = fmt !== 3 ? await reader.readU24BE() : information?.timestamp;
    const message_length = fmt === 0 || fmt === 1 ? await reader.readU24BE() : information?.message_length;
    const message_type_id = fmt === 0 || fmt === 1 ? await reader.readU8() : information?.message_type_id;
    const message_stream_id = fmt === 0 ? await reader.readU32LE() : information?.message_stream_id;
    // 情報がない時は何もできないしパースもできない。今回は簡単のため受信をやめて return する
    const is_extended_timestamp = fmt !== 3 ? timestamp === 0xFFFFFF : information?.is_extended_timestamp;
    if (timestamp == null || message_length == null || message_type_id == null || message_stream_id == null || is_extended_timestamp == null) { return; }
    const extended_timestamp = is_extended_timestamp ? await reader.readU32BE() : null;

    let timestamp_delta = null;
    if (fmt === 1 || fmt === 2) {
      if (information?.timestamp == null) { return; }
      timestamp_delta = extended_timestamp ?? timestamp;
      timestamp = information.timestamp + timestamp_delta;
    } else if (fmt === 3) {
      timestamp_delta = extended_timestamp ?? information?.timestamp_delta ?? timestamp;
      timestamp += (chunk_length > 0) ? 0 : timestamp_delta;
    } else {
      timestamp = extended_timestamp ?? timestamp;
    }

    information = { message_type_id, message_stream_id, message_length, timestamp, timestamp_delta, is_extended_timestamp } satisfies MessageInformation;
    const chunk = await reader.read(Math.min(message_length - chunk_length, chunk_maximum_size));
    const length = chunk_length + chunk.byteLength;
    if (length >= message_length) {
      if (length === message_length) {
        const data = Buffer.concat([... chunk_buffers, chunk])
        if (message_type_id === MessageType.SetChunkSize) {
          chunk_maximum_size = Math.max(1, Math.min(2 ** 31 - 1, data.readUInt32BE(0)));
        } else {
          const { timestamp_delta, is_extended_timestamp, ... message } = information;
          yield { ... message, data };
        }
      }
      chunks.set(cs_id, []);
      lengths.set(cs_id, 0);
    } else {
      chunks.set(cs_id, [... chunk_buffers, chunk]);
      lengths.set(cs_id, length);
    }
    informations.set(cs_id, information);
  }
}

メッセージのチャンク化

逆にメッセージをチャンクにして送るのは、cs_idfmt を限定できるのでだいぶ楽にできます。 ここでは cs_id は 3 固定にして、fmt は 0 と 3 しかとらないようにして簡易化しています。

message-writer.ts

import { Message } from "./message-reader";

export default write_message = (message: Omit<Message, 'message_length'>): Buffer => {
  let chunk_maximum_size = 128;
  const result: Buffer[] = [];

  for (let i = 0; i < message.data.byteLength; i += chunk_maximum_size) {
    const chunk = message.data.subarray(i, Math.min(message.data.byteLength, i + chunk_maximum_size));
    const cs_id = 3; // こちらからはシステム系のメッセージしか送らないので 3 に固定する
    const fmt = i === 0 ? 0 : 3; // 簡単化のため 0 か 3 以外は使わない
    const basic = Buffer.from([(fmt << 6) | cs_id]);

    if (fmt === 3) {
      result.push(basic, chunk);
      continue;
    }

    const header = Buffer.alloc(message.timestamp >= 0xFFFFFF ? 15 : 11);
    header.writeUIntBE(Math.min(message.timestamp, 0xFFFFFF), 0, 3);
    header.writeUIntBE(message.data.byteLength, 3, 3);
    header.writeUIntBE(message.message_type_id, 6, 1);
    header.writeUIntLE(message.message_stream_id, 7, 4);
    if (message.timestamp >= 0xFFFFFF) {
      header.writeUIntBE(message.timestamp, 11, 4);
    }

    result.push(basic, header, chunk);
  }

  return Buffer.concat(result);
}

AMF0

AMF0はActionScriptのシリアライズであるため、同じECMAScriptの一種である JavaScript との親和性が高いです。仕様はVeoveraのリポジトリから参照可能です。

昔は文字列をUTF-8にエンコードするのが大変でしたが、今はTextEncoder/TextDecoderがあるため楽になりました。

AMF0のデシリアライズ

仕様通り実装するだけですが、ObjectEndをSymbolとして処理を簡素化しています。

amf0-reader.ts

import ByteReader from "./byte-reader";

const UTF8Decoder = new TextDecoder('utf-8');
const scriptend: unique symbol = Symbol();

const string = (reader: ByteReader): string => {
  const length = reader.readU16BE();
  return UTF8Decoder.decode(reader.read(length));
}

const longstring = (reader: ByteReader): string => {
  const length = reader.readU32BE();
  return UTF8Decoder.decode(reader.read(length));
}

const object = (reader: ByteReader): Record<string, any> => {
  const object: Record<string, any> = {};
  while (true) {
    const name = string(reader);
    const val = value(reader);
    if (val === scriptend) { return object; }
    object[name] = val;
  }
}

const mixedarray = (reader: ByteReader): Record<string, any>  => {
  reader.readU32BE(); // length
  return object(reader);
}

const strictarray = (reader: ByteReader): any[] => {
  const length = reader.readU32BE();
  const array = [];
  for (let i = 0; i < length; i++) {
    array.push(value(reader));
  }
  return array;
}

const date = (reader: ByteReader): Date => {
  const timestamp = reader.readF64BE();
  const localtimeoffset = reader.readI16BE();
  return new Date(timestamp);
}

const value = (reader: ByteReader): any => {
  const tag = reader.readU8();
  switch (tag) {
    case 0: return reader.readF64BE();
    case 1: return reader.readU8() !== 0;
    case 2: return string(reader);
    case 3: return object(reader);
    case 4: throw new Error('Unsupported Tag: 4 (movie clip)');
    case 5: return null;
    case 6: return undefined;
    case 7: throw new Error('Unsupported Tag: 7 (reference)');
    case 8: return mixedarray(reader);
    case 9: return scriptend;
    case 10: return strictarray(reader);
    case 11: return date(reader);
    case 12: return longstring(reader);
  }
  throw new Error(`Invalid tag: ${tag}`);
}

export default (data: Buffer): any[] => {
  const reader = new ByteReader(data);
  const result = [];
  while (!reader.isEOF()) {
    result.push(value(reader));
  }
  return result;
}

AMF0のシリアライズ

こちらも仕様通り実装するだけになります。

amf0-writer.ts

const UTF8Encoder = new TextEncoder();

const number = (data: number): Buffer => {
  const buffer = Buffer.alloc(8)
  buffer.writeDoubleBE(data);
  return buffer;
}

const string = (data: string): Buffer => {
  const utf8 = Buffer.from(UTF8Encoder.encode(data));
  const length = Buffer.alloc(2);
  length.writeUInt16BE(utf8.byteLength);
  return Buffer.concat([length, utf8]);
}

const object = (obj: Record<string, any>): Buffer => {
  return Buffer.concat([... Object.entries(obj).map(([k, v]) => Buffer.concat([string(k), value(v)])), Buffer.from([0x00, 0x00, 0x09])]);
}

const array = (data: any[]): Buffer => {
  const length = Buffer.alloc(4);
  length.writeUInt32BE(data.length);
  return Buffer.concat([length, ... data.map((datum) => value(datum))]);
}

const date = (data: Date): Buffer => {
  const buffer = Buffer.alloc(10);
  buffer.writeDoubleBE(data.getTime(), 0);
  buffer.writeInt16BE(0, 8); // reserved
  return buffer;
}

const value = (data: any): Buffer => {
  if (data === null) { return Buffer.from([0x05]); }
  if (data === undefined) { return Buffer.from([0x06]); }
  if (Array.isArray(data)) { return Buffer.concat([Buffer.from([0x0a]), array(data)]); }
  if (data instanceof Date) { return Buffer.concat([Buffer.from([0x0b]), date(data)]); }
  switch (typeof data) {
    case 'number': return Buffer.concat([Buffer.from([0x00]), number(data)]);
    case 'boolean': return Buffer.from([0x01, data ? 1 : 0]);
    case 'string': return  Buffer.concat([Buffer.from([0x02]), string(data)]);
    case 'object': return Buffer.concat([Buffer.from([0x03]), object(data)]);
    default: return Buffer.from([]);
  }
}

export default (... data: any[]): Buffer => {
  return Buffer.concat(data.map((datum) => value(datum)));
}

配信のためのメッセージング

RTMPのハンドシェイクが終わったら、次にクライアントとサーバ間で配信のためのネゴシエーションをします。これは上述のAMF0を用いたメッセージでやり取りします。打ち上げ側がサーバ側のメソッドをRPCで叩くようなイメージです。

配信するためのメッセージングは以下の順番で行います。

  1. connect
    • 配信するための接続の承認を求めるメッセージ
    • いわゆるRTMPで利用するURLのアプリ名部分はここで取れます
    • NetConnection.Connect.Success を返却すると接続承認となる
    • 返却値の objectEncoding が 0 だと AMF0 で以後の通信をする
  2. FCPublish
    • (多分) ストリームキーを事前に送るため? のメッセージ
    • RTMPの仕様書にはないメッセージでFMSが使っていたものらしく、それに倣って送られる
    • OSSのプロダクトはこれを無視しても配信してくれるので、今回は何も返却せず放置
    • さらにonFCPublishを返却する場合もOSS実装間で揺れがあるのでカオス
  3. createStream
    • 動画の通信用のストリームを作るメッセージ
    • 新しく使いたいストリームの番号を返すと、そのストリームを利用する
  4. publish
    • ストリームキーでサーバに配信開始を要求するメッセージ
    • NetStream.Publish.Start を返却すると配信承認となり配信を開始する

シーケンス図で書くとこのような形になります。

RTMPの打ち上げのためのメッセージング

詳細な内容やAMFの構成はコードを見た方が早いので、コードを参照してください。返却値はもはやFMSが現役ではない以上、テンプレートみたいな固定値をAMFで送る様相みたいです。

RTMPのネゴシエーション (疑似コードです)

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
} as const;

let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
for await (const message of read_message(reader)) {
  // 本当は共通で処理するシステムメッセージ部分を書く、大体の打ち上げRTMPではなくても良いので省略

  switch (state) {
    case STATE.WAITING_CONNECT: {
      if (message.message_stream_id !== 0) { continue; }
      if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
      const command = read_amf0(message.data);

      const name = command[0];
      if (name !== 'connect') { continue; }
      const transaction_id = command[1];
      const appName = command[2]['app'];
      console.error(`appName: ${appName}`);

      const result = write_amf0(
        '_result',
        transaction_id,
        {
          fmsVer: 'FMS/3,5,7,7009',
          capabilities: 31,
          mode: 1,
        }, {
          code: 'NetConnection.Connect.Success',
          description: 'Connection succeeded.',
          data: {
            version: '3,5,7,7009',
          },
          objectEncoding: 0, // 0 = AMF0, 3 = AMF3
          level: 'status', // 正常系
        }
      );
      connection.write(write_message({
        message_type_id: MessageType.CommandAMF0,
        message_stream_id: 0,
        timestamp: 0,
        data: result,
      }));

      state = STATE.WAITING_CREATESTREAM;
      break;
    }
    case STATE.WAITING_CREATESTREAM: {
      if (message.message_stream_id !== 0) { continue; }
      if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
      const command = read_amf0(message.data);

      const name = command[0];
      if (name !== 'createStream') { continue; }
      const transaction_id = command[1];

      const result = write_amf0(
        '_result',
        transaction_id,
        null,
        1, // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
      );
      connection.write(write_message({
        message_type_id: MessageType.CommandAMF0,
        message_stream_id: 0,
        timestamp: 0,
        data: result,
      }));

      state = STATE.WAITING_PUBLISH;
      break;
    }
    case STATE.WAITING_PUBLISH: {
      if (message.message_stream_id !== 1) { continue; }
      if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
      const command = read_amf0(message.data);

      const name = command[0];
      if (name !== 'publish') { continue; }
      const transaction_id = command[1];
      const streamKey = command[3];
      console.error(`streamKey: ${streamKey}`);

      const result = write_amf0(
        'onStatus',
        transaction_id,
        null,
        {
          code: 'NetStream.Publish.Start',
          description: 'Publish Accepted',
          level: 'status' // 正常系
        }
      );
      connection.write(write_message({
        message_type_id: MessageType.CommandAMF0,
        message_stream_id: message.message_stream_id,
        timestamp: 0,
        data: result,
      }));

      state = STATE.PUBLISHED;
      break;
    }
    case STATE.PUBLISHED: {
      // 配信開始したらここにくる
      break;
    }
  }
}

FLVに変換する

RTMPを実装するついでにFLVを出力してみましょう。FLVはヘッダーとタグ(可変長パケット)で構成された動画フォーマットで、これもAdobeが策定し、現在はVeoveraが管理しています。

実はFLVはRTMPの映像/音声/データのメッセージに対してヘッダーをいじるだけで作ることができます。RTMPのメッセージとFLVのタグの関係は以下の通りです。RTMPのメッセージのヘッダーの順番とタイムスタンプの記載方法をいじるだけでFLVのタグになります。

RTMPのメッセージとFLVのタグの関係

FLVのヘッダーはマジックバイトみたいなもので、ほぼ固定の内容になります。また、タグの前に直前のタグの長さを記載します。最初のタグの場合は前のタグが存在しないので0を記載します。

実際に受信したデータを再生できる形式にできるとやっぱり胸が熱くなるので、ここでやってしまいましょう。

flv-writer.ts

import { Writable } from "stream";
import { Message, MessageType } from "./message-reader";

export default class FLVWriter {
  private output: Writable;
  private previousTagSize: number = 0;

  public constructor(output: Writable) {
    this.output = output;
    // FLV Header
    this.output.write(Buffer.from([
      0x46, 0x4C, 0x56, // Signature (FLV)
      1, // version
      4 | 1, // 4: Audio Present, 1: Video Present
      0, 0, 0, 9, // Header Bytes
    ]))
  }

  public write(message: Message): void {
    if (message.message_type_id !== MessageType.Audio && message.message_type_id !== MessageType.Video) { return; }

    const header = Buffer.alloc(15);
    header.writeUIntBE(this.previousTagSize, 0, 4);
    header.writeUIntBE(message.message_type_id, 4, 1)
    header.writeUIntBE(message.data.byteLength, 5, 3);
    header.writeUInt8(Math.floor(message.timestamp / (2 ** 16)) % (2 ** 8),  8);
    header.writeUInt8(Math.floor(message.timestamp / (2 **  8)) % (2 ** 8),  9);
    header.writeUInt8(Math.floor(message.timestamp / (2 **  0)) % (2 ** 8), 10);
    header.writeUInt8(Math.floor(message.timestamp / (2 ** 24)) % (2 ** 8), 11);
    header.writeUIntBE(0, 12, 3);

    this.output.write(header);
    this.output.write(message.data);
    this.previousTagSize = (header.byteLength + message.data.byteLength) - 4;
  }
}

RTMP 受信

全体的なRTMP受信のコードは以下のようになります。

このサンプルコードでは --flv という引数でRTMPから動画をFLVを抜き出してファイルや標準出力に出力できるようにしています。なので、ffmpegやOBSなどの配信ソフトから打ち上げた映像をそのままFLVにダンプでき、そのファイルはffplayなどで再生できます。デバッグなどで便利かもしれません。

index.ts

import net from "node:net";
import { randomBytes } from "node:crypto";
import { Duplex } from "node:stream";
import fs from "node:fs";
import { parseArgs } from "node:util";

import AsyncByteReader from "./async-byte-reader";
import read_message, { MessageType } from "./message-reader";
import write_message from "./message-writer";
import read_amf0 from "./amf0-reader";
import write_amf0 from "./amf0-writer";
import FLVWriter from "./flv-writer";

const options = {
  port: {
    type: 'string',
    default: '1935',
  },
  flv: {
    type: 'string',
  }
} as const ;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.port, 10))) {
  console.error('Please Specify Valid PORT number');
  process.exit(-1);
}
const port = Number.parseInt(args.port, 10);
const output = args.flv == null ? null : args.flv === '-' ? process.stdout : fs.createWriteStream(args.flv);

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
} as const;

const handle_rtmp = async (connection: Duplex) => {
  const reader = new AsyncByteReader();
  connection.on('data', (data: Buffer) => { reader.push(data); })
  const writer = output != null ? new FLVWriter(output) : null;

  /*
   * RTMPのハンドシェイクを処理する
   */
  {
    // C0/S0
    await reader.readU8(); // Read C0
    connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
    // C1/S1
    const c1 = await reader.read(1536); // read C1
    const c1_random = c1.subarray(8);
    const s1_random = randomBytes(1536 - 8);
    const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
    connection.write(s1); // write C1
    // C2/S2
    const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
    connection.write(s2); // write S2
    const c2 = await reader.read(1536); // read C2
    const c2_random_echo = c2.subarray(8);
    // Check Random Echo
    if (s1_random.byteLength !== c2_random_echo.byteLength || !s1_random.equals(c2_random_echo)) {
      throw new Error('Invalid Random Echo');
    }
  }
  /*
   * RTMPのメッセージを処理する
   */
  let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
  for await (const message of read_message(reader)) {
    switch (state) {
      // 本当は共通で処理するシステムメッセージ部分を書く、大体の打ち上げRTMPではなくても良いので省略

      case STATE.WAITING_CONNECT: {
        if (message.message_stream_id !== 0) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'connect') { continue; }
        const transaction_id = command[1];
        const appName = command[2]['app'];
        console.error(`appName: ${appName}`);

        const result = write_amf0(
          '_result',
          transaction_id, // connect に対する _result の transaction_id は 1
          {
            fmsVer: 'FMS/3,5,7,7009',
            capabilities: 31,
            mode: 1,
          }, {
            code: 'NetConnection.Connect.Success',
            description: 'Connection succeeded.',
            data: { version: '3,5,7,7009' },
            objectEncoding: 0, // 0 = AMF0, 3 = AMF3
            level: 'status', // 正常系
          }
        );
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: 0,
          timestamp: 0,
          data: result,
        }));

        state = STATE.WAITING_CREATESTREAM;
        break;
      }
      case STATE.WAITING_CREATESTREAM: {
        if (message.message_stream_id !== 0) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'createStream') { continue; }
        const transaction_id = command[1];

        // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
        const result = write_amf0('_result', transaction_id, null, 1);
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: 0,
          timestamp: 0,
          data: result,
        }));

        state = STATE.WAITING_PUBLISH;
        break;
      }
      case STATE.WAITING_PUBLISH: {
        if (message.message_stream_id !== 1) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'publish') { continue; }
        const transaction_id = command[1];
        const streamKey = command[3];
        console.error(`streamKey: ${streamKey}`);

        const result = write_amf0(
          'onStatus',
          transaction_id,
          null,
          {
            code: 'NetStream.Publish.Start',
            description: 'Publish Accepted',
            level: 'status' // 正常系
          }
        );
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: message.message_stream_id,
          timestamp: 0,
          data: result,
        }));

        state = STATE.PUBLISHED;
        break;
      }
      case STATE.PUBLISHED: {
        writer?.write(message);
        break;
      }
    }
  }
}

const server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle_rtmp(connection);
});
server.listen(port);

まとめ

RTMPを実装して理解するというテーマでやってみましたが、いかがでしたでしょうか。RTMPは高機能ですが、打ち上げで使う部分はごく少数の部分だけで、FMSが現役の時はともかく、もうRTMPは最低限の仕様がRTMPのPDFをベースに空気で決まってる感じになっています。受信するだけならある程度ルールを守ればできるという感触を得てもらえれば幸いです。

ちなみに、コード行数を数えると600行弱でした。Node.jsの標準ライブラリだけ使うようにしていますが、例示用のコードとして、行数は多くなく実装できているんじゃないかと思います。本当はエラー処理などの部分が必要なのでもうちょっと行数が増えますが、理解のための実装なら問題ないでしょう。そんな行数でできるんだったら俺にもできるわ、と思っていただいたら嬉しく、そういった方は是非RTMPの受信モジュールを作って公開してください。待ってます。

実際のところ、RTMPを実装する機会はほとんどの場合ないと思いますが、RTMPがどうなっているかを知ることは配信技術を扱う上で重要だと思います。ffmpegやgstreamer,nginx-rtmp-module,wowzaなどのツールやAWSやGoogle Cloudのマネージドなメディア系エコシステムの使い方を知っていることももちろん重要ですが、根幹のRTMPの知識を持っていれば、ツールやエコシステムでは対応してない場合にも自分で対応できる応用力が生まれます。既存のツールでは思想的にやりたいことができない場合には自作することもできるわけです。嬉しいことだらけですね。

というわけで、みなさんもぜひ配信スタックを自作してみましょう。楽しい世界が待っていますよ 。

おまけ

今回のブログ記事で紹介したコード片を1ファイルにまとめてみました。このTypeScriptのコードをファイルに貼り付けて--experimental-strip-typesで動かせばNode.jsだけで動きますので、是非試してFLVファイルを作ってみてください。

1ファイルまとめ版 (クリックで展開)

bundled.mts

import net from "node:net";
import { randomBytes } from "node:crypto";
import { Duplex, Writable } from "node:stream";
import fs from "node:fs";
import { parseArgs } from "node:util";

class AsyncByteReader {
  private buffers: Buffer[] = [];
  private offset = 0;
  private totals = 0;
  private promises: [number, (result: Buffer) => void][] = [];

  private fulfill(): void {
    while (this.promises.length > 0) {
      const [length, resolve] = this.promises[0];
      if (this.totals < length) { break; }

      const drained = [];
      let remains = length;
      while (this.buffers.length > 0) {
        const buffer = this.buffers[0];
        const capacity = buffer.byteLength - this.offset;

        if (capacity > remains) {
          drained.push(buffer.subarray(this.offset, this.offset + remains));
          this.offset += remains;
          break;
        }

        drained.push(buffer.subarray(this.offset));
        this.buffers.shift();
        this.offset = 0;
        remains -= capacity;
      }

      this.totals -= length;
      resolve(Buffer.concat(drained));
      this.promises.shift();
    }
  }

  public push(buffer: Buffer): void {
    this.buffers.push(buffer);
    this.totals += buffer.byteLength;
    this.fulfill();
  }

  public async read(size: number): Promise<Buffer> {
    const { promise, resolve } = Promise.withResolvers<Buffer>();
    this.promises.push([size, resolve]);
    this.fulfill();
    return promise;
  }

  public async readUIntBE(length: number): Promise<number> {
    return (await this.read(length)).readUIntBE(0, length);
  }

  public async readUIntLE(length: number): Promise<number> {
    return (await this.read(length)).readUIntLE(0, length);
  }

  public readU8(): Promise<number> {
    return this.readUIntBE(1);
  }

  public readU16BE(): Promise<number> {
    return this.readUIntBE(2);
  }

  public readU24BE(): Promise<number> {
    return this.readUIntBE(3);
  }

  public readU32BE(): Promise<number> {
    return this.readUIntBE(4);
  }

  public readU16LE(): Promise<number> {
    return this.readUIntLE(2);
  }

  public readU24LE(): Promise<number> {
    return this.readUIntLE(3);
  }

  public readU32LE(): Promise<number> {
    return this.readUIntLE(4);
  }
}

class ByteReader {
  private buffer: Buffer;
  private offset = 0;

  public constructor(buffer: Buffer) {
    this.buffer = buffer;
  }

  public isEOF(): boolean {
    return this.offset >= this.buffer.byteLength;
  }

  public read(length: number): Buffer {
    if (length < 0) { length = this.buffer.byteLength - this.offset; }
    const value = this.buffer.subarray(this.offset, this.offset + length);
    this.offset += length;
    return value;
  }

  public readUIntBE(length: number): number {
    return this.read(length).readUIntBE(0, length);
  }

  public readIntBE(length: number): number {
    return this.read(length).readIntBE(0, length);
  }

  public readU8(): number {
    return this.readUIntBE(1);
  }

  public readU16BE(): number {
    return this.readUIntBE(2);
  }

  public readU24BE(): number {
    return this.readUIntBE(3);
  }

  public readU32BE(): number {
    return this.readUIntBE(4);
  }

  public readI8(): number {
    return this.readIntBE(1);
  }

  public readI16BE(): number {
    return this.readIntBE(2);
  }

  public readI24BE(): number {
    return this.readIntBE(3);
  }

  public readI32BE(): number {
    return this.readIntBE(4);
  }

  public readF64BE(): number {
    return this.read(8).readDoubleBE(0);
  }
}

const UTF8Decoder = new TextDecoder('utf-8');
const scriptend: unique symbol = Symbol();

const read_string = (reader: ByteReader): string => {
  const length = reader.readU16BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_longstring = (reader: ByteReader): string => {
  const length = reader.readU32BE();
  return UTF8Decoder.decode(reader.read(length));
}

const read_object = (reader: ByteReader): Record<string, any> => {
  const object: Record<string, any> = {};
  while (true) {
    const name = read_string(reader);
    const val = read_value(reader);
    if (val === scriptend) { return object; }
    object[name] = val;
  }
}

const read_mixedarray = (reader: ByteReader): Record<string, any>  => {
  reader.readU32BE(); // length
  return read_object(reader);
}

const read_strictarray = (reader: ByteReader): any[] => {
  const length = reader.readU32BE();
  const array = [];
  for (let i = 0; i < length; i++) {
    array.push(read_value(reader));
  }
  return array;
}

const read_date = (reader: ByteReader): Date => {
  const timestamp = reader.readF64BE();
  const localtimeoffset = reader.readI16BE();
  return new Date(timestamp);
}

const read_value = (reader: ByteReader): any => {
  const tag = reader.readU8();
  switch (tag) {
    case 0: return reader.readF64BE();
    case 1: return reader.readU8() !== 0;
    case 2: return read_string(reader);
    case 3: return read_object(reader);
    case 4: throw new Error('Unsupported Tag: 4 (movie clip)');
    case 5: return null;
    case 6: return undefined;
    case 7: throw new Error('Unsupported Tag: 7 (reference)');
    case 8: return read_mixedarray(reader);
    case 9: return scriptend;
    case 10: return read_strictarray(reader);
    case 11: return read_date(reader);
    case 12: return read_longstring(reader);
  }
  throw new Error(`Invalid tag: ${tag}`);
}

const read_amf0 = (data: Buffer): any[] => {
  const reader = new ByteReader(data);
  const result = [];
  while (!reader.isEOF()) {
    result.push(read_value(reader));
  }
  return result;
}

const UTF8Encoder = new TextEncoder();

const write_number = (data: number): Buffer => {
  const buffer = Buffer.alloc(8)
  buffer.writeDoubleBE(data);
  return buffer;
}

const write_string = (data: string): Buffer => {
  const utf8 = Buffer.from(UTF8Encoder.encode(data));
  const length = Buffer.alloc(2);
  length.writeUInt16BE(utf8.byteLength);
  return Buffer.concat([length, utf8]);
}

const write_object = (obj: Record<string, any>): Buffer => {
  return Buffer.concat([... Object.entries(obj).map(([k, v]) => Buffer.concat([write_string(k), write_value(v)])), Buffer.from([0x00, 0x00, 0x09])]);
}

const write_array = (data: any[]): Buffer => {
  const length = Buffer.alloc(4);
  length.writeUInt32BE(data.length);
  return Buffer.concat([length, ... data.map((datum) => write_value(datum))]);
}

const write_date = (data: Date): Buffer => {
  const buffer = Buffer.alloc(10);
  buffer.writeDoubleBE(data.getTime(), 0);
  buffer.writeInt16BE(0, 8); // reserved
  return buffer;
}

const write_value = (data: any): Buffer => {
  if (data === null) { return Buffer.from([0x05]); }
  if (data === undefined) { return Buffer.from([0x06]); }
  if (Array.isArray(data)) { return Buffer.concat([Buffer.from([0x0a]), write_array(data)]); }
  if (data instanceof Date) { return Buffer.concat([Buffer.from([0x0b]), write_date(data)]); }
  switch (typeof data) {
    case 'number': return Buffer.concat([Buffer.from([0x00]), write_number(data)]);
    case 'boolean': return Buffer.from([0x01, data ? 1 : 0]);
    case 'string': return  Buffer.concat([Buffer.from([0x02]), write_string(data)]);
    case 'object': return Buffer.concat([Buffer.from([0x03]), write_object(data)]);
    default: return Buffer.from([]);
  }
}

const write_amf0 = (... data: any[]): Buffer => {
  return Buffer.concat(data.map((datum) => write_value(datum)));
}

type MessageInformation = {
  message_type_id: number;
  message_stream_id: number
  message_length: number;
  timestamp: number;
  timestamp_delta: number | null;
  is_extended_timestamp: boolean;
}

const MessageType = {
  SetChunkSize: 1,
  Abort: 2,
  Acknowledgement: 3,
  UserControl: 4,
  WindowAcknowledgementSize: 5,
  SetPeerBandwidth: 6,
  Audio: 8,
  Video: 9,
  DataAMF3: 15,
  CommandAMF3: 17,
  DataAMF0: 18,
  CommandAMF0: 20,
} as const;

type Message = Omit<MessageInformation, 'timestamp_delta' | 'is_extended_timestamp'> & {
  data: Buffer;
}

async function* read_message(reader: AsyncByteReader): AsyncIterable<Message> {
  let chunk_maximum_size = 128; // システムメッセージにより変化する
  const informations = new Map<number, MessageInformation>();
  const chunks = new Map<number, Buffer[]>();
  const lengths = new Map<number, number>();

  while (true) {
    const basic = await reader.readU8();
    const fmt = (basic & 0b11000000) >> 6;
    let cs_id = (basic & 0b00111111) >> 0;
    switch (cs_id) {
      case 0: cs_id = 64 + await reader.readU8(); break;
      case 1: cs_id = 64 + await reader.readU16LE(); break;
    }
    if (fmt !== 3) { chunks.set(cs_id, []); lengths.set(cs_id, 0); }
    const chunk_buffers = chunks.get(cs_id) ?? [];
    const chunk_length = lengths.get(cs_id) ?? 0;
    let information = informations.get(cs_id);

    let timestamp = fmt !== 3 ? await reader.readU24BE() : information?.timestamp;
    const message_length = fmt === 0 || fmt === 1 ? await reader.readU24BE() : information?.message_length;
    const message_type_id = fmt === 0 || fmt === 1 ? await reader.readU8() : information?.message_type_id;
    const message_stream_id = fmt === 0 ? await reader.readU32LE() : information?.message_stream_id;
    // 情報がない時は何もできないしパースもできない。今回は簡単のため受信をやめて return する
    const is_extended_timestamp = fmt !== 3 ? timestamp === 0xFFFFFF : information?.is_extended_timestamp;
    if (timestamp == null || message_length == null || message_type_id == null || message_stream_id == null || is_extended_timestamp == null) { return; }
    const extended_timestamp = is_extended_timestamp ? await reader.readU32BE() : null;

    let timestamp_delta = null;
    if (fmt === 1 || fmt === 2) {
      if (information?.timestamp == null) { return; }
      timestamp_delta = extended_timestamp ?? timestamp;
      timestamp = information.timestamp + timestamp_delta;
    } else if (fmt === 3) {
      timestamp_delta = extended_timestamp ?? information?.timestamp_delta ?? timestamp;
      timestamp += (chunk_length > 0) ? 0 : timestamp_delta;
    } else {
      timestamp = extended_timestamp ?? timestamp;
    }

    information = { message_type_id, message_stream_id, message_length, timestamp, timestamp_delta, is_extended_timestamp } satisfies MessageInformation;
    const chunk = await reader.read(Math.min(message_length - chunk_length, chunk_maximum_size));
    const length = chunk_length + chunk.byteLength;
    if (length >= message_length) {
      if (length === message_length) {
        const data = Buffer.concat([... chunk_buffers, chunk])
        if (message_type_id === MessageType.SetChunkSize) {
          chunk_maximum_size = Math.max(1, Math.min(2 ** 31 - 1, data.readUInt32BE(0)));
        } else {
          const { timestamp_delta, is_extended_timestamp, ... message } = information;
          yield { ... message, data };
        }
      }
      chunks.set(cs_id, []);
      lengths.set(cs_id, 0);
    } else {
      chunks.set(cs_id, [... chunk_buffers, chunk]);
      lengths.set(cs_id, length);
    }
    informations.set(cs_id, information);
  }
}

const write_message = (message: Omit<Message, 'message_length'>): Buffer => {
  let chunk_maximum_size = 128;
  const result: Buffer[] = [];

  for (let i = 0; i < message.data.byteLength; i += chunk_maximum_size) {
    const chunk = message.data.subarray(i, Math.min(message.data.byteLength, i + chunk_maximum_size));
    const cs_id = 3; // こちらからはシステム系のメッセージしか送らないので 3 に固定する
    const fmt = i === 0 ? 0 : 3; // 簡単化のため 0 か 3 以外は使わない
    const basic = Buffer.from([(fmt << 6) | cs_id]);

    if (fmt === 3) {
      result.push(basic, chunk);
      continue;
    }

    const header = Buffer.alloc(message.timestamp >= 0xFFFFFF ? 15 : 11);
    header.writeUIntBE(Math.min(message.timestamp, 0xFFFFFF), 0, 3);
    header.writeUIntBE(message.data.byteLength, 3, 3);
    header.writeUIntBE(message.message_type_id, 6, 1);
    header.writeUIntLE(message.message_stream_id, 7, 4);
    if (message.timestamp >= 0xFFFFFF) {
      header.writeUIntBE(message.timestamp, 11, 4);
    }

    result.push(basic, header, chunk);
  }

  return Buffer.concat(result);
}

class FLVWriter {
  private output: Writable;
  private previousTagSize: number = 0;

  public constructor(output: Writable) {
    this.output = output;
    // FLV Header
    this.output.write(Buffer.from([
      0x46, 0x4C, 0x56, // Signature (FLV)
      1, // version
      4 | 1, // 4: Audio Present, 1: Video Present
      0, 0, 0, 9, // Header Bytes
    ]))
  }

  public write(message: Message): void {
    if (message.message_type_id !== MessageType.Audio && message.message_type_id !== MessageType.Video) { return; }

    const header = Buffer.alloc(15);
    header.writeUIntBE(this.previousTagSize, 0, 4);
    header.writeUIntBE(message.message_type_id, 4, 1)
    header.writeUIntBE(message.data.byteLength, 5, 3);
    header.writeUInt8(Math.floor(message.timestamp / (2 ** 16)) % (2 ** 8),  8);
    header.writeUInt8(Math.floor(message.timestamp / (2 **  8)) % (2 ** 8),  9);
    header.writeUInt8(Math.floor(message.timestamp / (2 **  0)) % (2 ** 8), 10);
    header.writeUInt8(Math.floor(message.timestamp / (2 ** 24)) % (2 ** 8), 11);
    header.writeUIntBE(0, 12, 3);

    this.output.write(header);
    this.output.write(message.data);
    this.previousTagSize = (header.byteLength + message.data.byteLength) - 4;
  }
}

const options = {
  port: {
    type: 'string',
    default: '1935',
  },
  flv: {
    type: 'string',
  }
} as const ;
const { values: args } = parseArgs({ options, tokens: true });
if (Number.isNaN(Number.parseInt(args.port, 10))) {
  console.error('Please Specify Valid PORT number');
  process.exit(-1);
}
const port = Number.parseInt(args.port, 10);
const output = args.flv == null ? null : args.flv === '-' ? process.stdout : fs.createWriteStream(args.flv);

const STATE = {
  WAITING_CONNECT: 'WAITING_CONNECT',
  WAITING_CREATESTREAM: 'WAITING_CREATESTREAM',
  WAITING_PUBLISH: 'WAITING_PUBLISH',
  PUBLISHED: 'PUBLISHED',
} as const;

const handle_rtmp = async (connection: Duplex) => {
  const reader = new AsyncByteReader();
  connection.on('data', (data: Buffer) => { reader.push(data); })
  const writer = output != null ? new FLVWriter(output) : null;

  /*
   * RTMPのハンドシェイクを処理する
   */
  {
    // C0/S0
    await reader.readU8(); // Read C0
    connection.write(Buffer.from([0x03])); // Write S0 (Version: 3)
    // C1/S1
    const c1 = await reader.read(1536); // read C1
    const c1_random = c1.subarray(8);
    const s1_random = randomBytes(1536 - 8);
    const s1 = Buffer.concat([Buffer.alloc(8), s1_random]);
    connection.write(s1); // write C1
    // C2/S2
    const s2 = Buffer.concat([Buffer.alloc(8), c1_random]);
    connection.write(s2); // write S2
    const c2 = await reader.read(1536); // read C2
    const c2_random_echo = c2.subarray(8);
    // Check Random Echo
    if (s1_random.byteLength !== c2_random_echo.byteLength || !s1_random.equals(c2_random_echo)) {
      throw new Error('Invalid Random Echo');
    }
  }
  /*
   * RTMPのメッセージを処理する
   */
  let state: (typeof STATE)[keyof typeof STATE] = STATE.WAITING_CONNECT;
  for await (const message of read_message(reader)) {
    switch (state) {
      // 本当は共通で処理するシステムメッセージ部分を書く、大体の打ち上げRTMPではなくても良いので省略

      case STATE.WAITING_CONNECT: {
        if (message.message_stream_id !== 0) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'connect') { continue; }
        const transaction_id = command[1];
        const appName = command[2]['app'];
        console.error(`appName: ${appName}`);

        const result = write_amf0(
          '_result',
          transaction_id, // connect に対する _result の transaction_id は 1
          {
            fmsVer: 'FMS/3,5,7,7009',
            capabilities: 31,
            mode: 1,
          }, {
            code: 'NetConnection.Connect.Success',
            description: 'Connection succeeded.',
            data: { version: '3,5,7,7009' },
            objectEncoding: 0, // 0 = AMF0, 3 = AMF3
            level: 'status', // 正常系
          }
        );
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: 0,
          timestamp: 0,
          data: result,
        }));

        state = STATE.WAITING_CREATESTREAM;
        break;
      }
      case STATE.WAITING_CREATESTREAM: {
        if (message.message_stream_id !== 0) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'createStream') { continue; }
        const transaction_id = command[1];

        // message_stream_id は 0 が予約されている (今使ってる) ので 1 を利用する
        const result = write_amf0('_result', transaction_id, null, 1);
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: 0,
          timestamp: 0,
          data: result,
        }));

        state = STATE.WAITING_PUBLISH;
        break;
      }
      case STATE.WAITING_PUBLISH: {
        if (message.message_stream_id !== 1) { continue; }
        if (message.message_type_id !== MessageType.CommandAMF0) { continue; }
        const command = read_amf0(message.data);

        const name = command[0];
        if (name !== 'publish') { continue; }
        const transaction_id = command[1];
        const streamKey = command[3];
        console.error(`streamKey: ${streamKey}`);

        const result = write_amf0(
          'onStatus',
          transaction_id,
          null,
          {
            code: 'NetStream.Publish.Start',
            description: 'Publish Accepted',
            level: 'status' // 正常系
          }
        );
        connection.write(write_message({
          message_type_id: MessageType.CommandAMF0,
          message_stream_id: message.message_stream_id,
          timestamp: 0,
          data: result,
        }));

        state = STATE.PUBLISHED;
        break;
      }
      case STATE.PUBLISHED: {
        writer?.write(message);
        break;
      }
    }
  }
}

const server = net.createServer(async (connection) => {
  connection.setNoDelay(true);
  await handle_rtmp(connection);
});
server.listen(port);