みずぴー日記

月に行こうという目標があったから、アポロは月に行けた。飛行機を改良した結果、月に行けたわけではない。

🔬redux-saga

redux-sagaの動きを調べた。

redux-sagaは redux-sagaで非同期処理と戦うで説明されているように、非同期処理などを直列プログラムのような形式(直接形式; direct style) で書くためのライブラリである。 そのためにタスクを導入し、その切り替えを制御している。

複数のタスクを協調制御するという点で、コルーチンや軽量スレッド、fiberなどに類似していると感じた。

🔎対象

redux-saga v0.15.3を対象とする。ただし一部コードは説明のためにエラー処理や終了処理を省略する。

また counter-vanilla を元にした以下のプログラムの動きを追う。

// counter.js
//////////////////////////////////////////////////////////////////////////
//  Reducerの定義
// INCREMENTが来たら +1 する reducer
function counter(state, action) {
  if (typeof state === 'undefined') {
    return 0
  }
  switch (action.type) {
    case 'INCREMENT':
      return state + 1
    default:
      return state
  }
}

//////////////////////////////////////////////////////////////////////////
//  Sagaの定義
const effects = ReduxSaga.effects
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))

// INCREMENT_ASYNC の1秒後にINCREMENTを発生させる。
function* counterSaga() {
  while(1) {
    yield effects.take('INCREMENT_ASYNC')
    yield effects.call(delay, 1000)
    yield effects.put({type: 'INCREMENT'})
  }
}

//////////////////////////////////////////////////////////////////////////
//  redux-sagaの初期化
const createSagaMiddleware = ReduxSaga.default
const sagaMiddleware = createSagaMiddleware()
var store = Redux.createStore(
    counter,
    Redux.applyMiddleware(sagaMiddleware))

//////////////////////////////////////////////////////////////////////////
//  タスクの初期化・実行
sagaMiddleware.run(counterSaga)

//////////////////////////////////////////////////////////////////////////
//  イベントハンドラ
document
  .getElementById('incrementAsync')
  .addEventListener('click', function () {
    store.dispatch({ type: 'INCREMENT_ASYNC' })
  })

これは Increment async ボタンを押すと、1秒後にカウンタがインクリメントされるプログラムである。

f:id:mzp:20170507225036g:plain

🚀redux-sagaの初期化

reducer等を定義したのち、以下のようにredux-sagaの初期化を行なう。

// counter.js
const sagaMiddleware = createSagaMiddleware()
Redux.createStore(
    ...,
    Redux.applyMiddleware(sagaMiddleware))

createSagaMiddleware

createSagaMiddleware は以下のように定義されている。

// src/internal/middleware.js
export default function sagaMiddlewareFactory({ context = {}, ...options } = {}) {
  // 渡されたオプションが妥当であることを確認する
  if(logger && !is.func(logger)) {
    throw new Error('`options.logger` passed to the Saga middleware is not a function!')
  }
  // sagaMiddlewareを定義する
  function sagaMiddleware({ getState, dispatch }) {  /* snip */ }

  // 定義した関数を返す
  return sagaMiddleware
}

引数が妥当であることを確認をした上で、内部で定義した sagaMiddlewareを返す。

sagaMiddleware.runの初期化

sagaMiddlewareapplyMiddleware の内部で呼び出される。 この関数は以下のような定義されている。

// src/internal/middleware.js
function sagaMiddleware({ getState, dispatch }) {
  // sagaMiddleware.run を初期化する
  sagaMiddleware.run = runSaga.bind(null, {
    context,
    dispatch,
    /* snip */
  })

  return next => action => { /* snip */ }
}

sagaMiddleware.runrunSaga を代入し、sagaの実行をできるようにする。 この際、 Function.prototype.bind を使って dispatch などのReduxとやりとりするために必要な関数が runSaga に渡されるようにしている。

Reduxのミドルウェアとして動く next => action => .... については、イベントハンドラの動きを追う際に見る。

🏃タスクの作成・実行

// counter.js
sagaMiddleware.run(counterSaga)

sagaMiddleware.run によって counterSaga の実行が開始される。

タスクの生成

sagaMiddleware.run には runSaga が代入されている。 これは以下のような定義となっている。

// src/internal/runSaga.js
export function runSaga(
  storeInterface,
  saga,
  ...args
) {
  // ジェネレータを呼び出す
  let iterator = saga(...args)

  // タスクを作成する
  const task = proc(
    iterator,
    /* snip */
  )

  return task
}

saga(...args)counterSaga を呼び出している。 これはジェネレータなので、ここではイテレータが返るだけで関数本体は実行されない。

ここで作ったイテレータprocに渡し、タスクを生成する。 proc は以下のようなコードになっている。

// src/internal/proc.js
export default function proc(iterator, /* snip */) {
  // タスクを作る
  const task = newTask(parentEffectId, name, iterator, cont)

  // タスクを実行するnext を呼ぶ
  next()

  // タスクを返す
  return task

  // タスクを実行する関数
  function next(arg, isErr) { /* snip*/ }
}

newTask によって、タスクを管理するオブジェクトを生成している。 その後、タスクを実行する nextを呼び出したのち、タスクを返している。

タスクの実行

next は以下のようなコードで定義される。

// src/internal/proc.js
function next(arg, isErr) {
  // イテレータを進め、次のyieldまでを実行する
  let result = iterator.next(arg)

  // 返ってきた値に応じて処理をする
  runEffect(result.value, parentEffectId, '', next)
}

iterator.next(arg)イテレータを進め、その返り値をrunEffect に渡している。

runEffect での処理が完了したのち、タスクの実行を再開できるようにするため、 runEffect には自分自身である next を渡している。

📤アクションを待つ

counterSaga は以下のように定義されているので、イテレータが進めたれた際に yield effects.take('INCREMENT_ASYNC') まで実行される。

// counter.js
function* counterSaga() {
  while(1) {
    yield effects.take('INCREMENT_ASYNC') // <- ここまで実行される
    yield effects.call(delay, 1000)
    yield effects.put({type: 'INCREMENT'})
  }
}

effects.take('INCREMENT_ASYNC') の返り値は以下のようなオブジェクトになっており、これがそのままiterator.next() の返り値になる。

{
  "@@redux-saga/IO": true,
  "TAKE": { "pattern": "INCREMENT_ASYNC" }
}

このオブジェクトが runEffect に渡されると以下のような分岐を経て、 runTakeEffect に渡される。

// src/internal/io.js
// どの種別のエフェクトなのかを判定するための関数群を定義する
const TAKE = 'TAKE'
const createAsEffectType = type => effect => effect && effect[IO] && effect[type]
export const asEffect = {
  take : createAsEffectType(TAKE)
}

// src/internal/proc.js
function runEffect(effect, parentEffectId, label = '', cb) {
  let data
  // effectの種類に応じて、専用の関数を呼ぶ
  return (
    // Non declarative effect
      is.promise(effect)                      ? resolvePromise(effect, cb)
    : is.helper(effect)                       ? runForkEffect(wrapHelper(effect), effectId, cb)
    : is.iterator(effect)                     ? resolveIterator(effect, effectId, name, cb)
     // declarative effects
    : is.array(effect)                        ? runParallelEffect(effect, effectId, cb)
    : (data = asEffect.take(effect))          ? runTakeEffect(data, cb)
    : (data = asEffect.put(effect))           ? runPutEffect(data, cb)
    : (data = asEffect.all(effect))           ? runAllEffect(data, effectId, cb)
    : (data = asEffect.race(effect))          ? runRaceEffect(data, effectId, cb)
    : (data = asEffect.call(effect))          ? runCallEffect(data, effectId, cb)
    : (data = asEffect.cps(effect))           ? runCPSEffect(data, cb)
    : (data = asEffect.fork(effect))          ? runForkEffect(data, effectId, cb)
    : (data = asEffect.join(effect))          ? runJoinEffect(data, cb)
    : (data = asEffect.cancel(effect))        ? runCancelEffect(data, cb)
    : (data = asEffect.select(effect))        ? runSelectEffect(data, cb)
    : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, cb)
    : (data = asEffect.flush(effect))         ? runFlushEffect(data, cb)
    : (data = asEffect.cancelled(effect))     ? runCancelledEffect(data, cb)
    : (data = asEffect.getContext(effect))    ? runGetContextEffect(data, cb)
    : (data = asEffect.setContext(effect))    ? runSetContextEffect(data, cb)
    : /* anything else returned as is        */              cb(effect)
  )
}

チャンネルへの登録

runTakeEffect は以下のような定義となっている。

// src/internal/proc.js
function runTakeEffect({channel, pattern, maybe}, cb) {
  channel = channel || stdChannel
  channel.take(cb, matcher(pattern))
}

// src/internal/channel.js
function take(cb, matcher) {
  cb[MATCH] = matcher
  takers.push(cb)
}

runTakeEffect では、タスク間の通信に使われるチャンネルに対して take を呼び、 チャンネルの takers 配列に cb を追加している。 この cbnext であるため、あとでこれを呼び出せばcounterSaga の実行が再開できる。

ここまでで sagaMiddleware.run の実行は完了し、redux-sagaの初期化が完了する。

👀イベントハンドラ

イベントハンドラを見ていく。

// counter.js
document
  .getElementById('incrementAsync')
  .addEventListener('click', function () {
    store.dispatch({ type: 'INCREMENT_ASYNC' })
  })

Increment async ボタンがクリックされると、Reduxのディスパッチャに INCREMENT_ASYNC アクションが渡される。

アクションの配信

先程は省略した sagaMiddleware は以下のように定義されている。

// src/internal/middleware.js
function sagaMiddleware({ getState, dispatch }) {
  const sagaEmitter = emitter()
  // ....
  return next => action => {
    // 次のミドルウェアにアクションを転送する
    const result = next(action)

    // アクションを配信する
    sagaEmitter.emit(action)
    return result
  }
}

次のミドルウェアにそのままアクションを転送することで、reducerを起動する。 その後、アクションを sagaEmitter.emit に渡す。

emitter

emitter は以下のように定義されており、 emit されると対応する subscribers が起動する。

// src/internal/channel.js
export function emitter() {
  const subscribers = []

  function subscribe(sub) {
    subscribers.push(sub)
    return () => remove(subscribers, sub)
  }

  function emit(item) {
    const arr = subscribers.slice()
    for (var i = 0, len =  arr.length; i < len; i++) {
      arr[i](item)
    }
  }

  return { subscribe,  emit }
}

チャンネル

チャンネルの take で利用していた stdChannel は以下のように定義されている。

// src/internal/proc.js
export default function proc(
  iterator,
  subscribe = () => noop,
  /* snip */
) {
  // procの引数として渡されたsubscribeを用いてチャンネルを作成する
  const stdChannel = _stdChannel(subscribe)
  ....
}

// src/internal/channel.js
export function _stdChannel(subscribe) {
  // eventChannel を用いてチャンネルを作成する
  const chan = eventChannel(cb => /* snip */)

  return {
    ...chan,
    take(cb, matcher) { /* snip */  }
  }
}

export function eventChannel(subscribe, buffer = buffers.none(), matcher) {
  const chan = channel(buffer)

  // 何かがemitされた場合は、それをチャンネルにputする
  subscribe(input => {
    chan.put(input)
  })

  return {
    take: chan.take,
    flush: chan.flush
  }
}

eventChannel で入力をそのままチャンネルに put する関数を登録している。 そのため、ディスパッチャに渡されたアクションが、チャンネルへと put される。

チャンネルへのput

チャンネルの put は以下のように定義されている。

// src/internal/channel.js
function put(input) {
  // takers配列が空の場合はバッファに追加する
  if (!takers.length) {
    return buffer.put(input)
  }

  // takers配列に関数が登録されている場合は、それに入力を渡す
  for (var i = 0; i < takers.length; i++) {
    const cb = takers[i]
    if(!cb[MATCH] || cb[MATCH](input)) {
      takers.splice(i, 1)
      return cb(input)
    }
  }
}

takers 配列に格納されている関数に入力を渡している。 今回は next が登録されているため、counterSaga の実行が再開される。

つまり、redux-sagaのミドルウェアからの put (emit)と、counterSagatake がチャンネルを挟んで対になって動作する。

f:id:mzp:20170508000833p:plain

🤝 プロミスの実行

counterSaga は 以下のように定義されているので、実行が再開されると effects.call(delay, 1000) まで実行される。

// counter.js
function* counterSaga() {
  while(1) {
    yield effects.take('INCREMENT_ASYNC') // <- さっきはここまで実行した
    yield effects.call(delay, 1000) // <- ここまで実行される
    yield effects.put({type: 'INCREMENT'})
  }
}

take の場合と同様に、この返り値は runEffect 内の分岐を経て、 runCallEffect に渡される。

// src/internal/proc.js (再掲)
function next(arg, isErr) {
  // イテレータを進め、次のyieldまでを実行する
  result = iterator.next(arg)

  // 返ってきた値に応じて処理をする
  runEffect(result.value, parentEffectId, '', next)
}

// src/internal/proc.js (再掲)
function runEffect(effect, parentEffectId, label = '', cb) {
  let data
  // effectの種類に応じて、専用の関数を呼ぶ
  return (
    // Non declarative effect
      is.promise(effect)                      ? resolvePromise(effect, cb)
    : is.helper(effect)                       ? runForkEffect(wrapHelper(effect), effectId, cb)
    : is.iterator(effect)                     ? resolveIterator(effect, effectId, name, cb)
     // declarative effects
    : is.array(effect)                        ? runParallelEffect(effect, effectId, cb)
    : (data = asEffect.take(effect))          ? runTakeEffect(data, cb)
    : (data = asEffect.put(effect))           ? runPutEffect(data, cb)
    : (data = asEffect.all(effect))           ? runAllEffect(data, effectId, cb)
    : (data = asEffect.race(effect))          ? runRaceEffect(data, effectId, cb)
    : (data = asEffect.call(effect))          ? runCallEffect(data, effectId, cb)
    : (data = asEffect.cps(effect))           ? runCPSEffect(data, cb)
    : (data = asEffect.fork(effect))          ? runForkEffect(data, effectId, cb)
    : (data = asEffect.join(effect))          ? runJoinEffect(data, cb)
    : (data = asEffect.cancel(effect))        ? runCancelEffect(data, cb)
    : (data = asEffect.select(effect))        ? runSelectEffect(data, cb)
    : (data = asEffect.actionChannel(effect)) ? runChannelEffect(data, cb)
    : (data = asEffect.flush(effect))         ? runFlushEffect(data, cb)
    : (data = asEffect.cancelled(effect))     ? runCancelledEffect(data, cb)
    : (data = asEffect.getContext(effect))    ? runGetContextEffect(data, cb)
    : (data = asEffect.setContext(effect))    ? runSetContextEffect(data, cb)
    : /* anything else returned as is        */              cb(effect)
  )
}

Promise.prototype.thenへの登録

runCallEffectは以下のように定義されている。

// src/internal/proc.js
function runCallEffect({context, fn, args}, effectId, cb) {
  // callの引数に渡された関数を起動する。  
  let result = fn.apply(context, args)

  // 返り値としてプロミスが返ってくるので、resolvePromiseに渡す
  return resolvePromise(result, cb)
}

function resolvePromise(promise, cb) {
 // Promise.prototype.then にコールバック関数を登録する
  promise.then(
    cb,
    error => cb(error, true)
  )
}

call エフェクトに渡された delayrunCallEffect 内で呼び出される。 その返り値となるプロミスは、resolvePromise に渡される。 resolvePromise 内では、Promise.prototype.thencb を登録する。

この cbcounterSaganext であるので、プロミスの実行が完了したのち counterSaga の実行が再開される。

🔈アクションのディスパッチ

プロミスの実行が完了したのち、nextによって effects.put({type: 'INCREMENT'}) まで実行が進む。

// counter.js
function* counterSaga() {
  while(1) {
    yield effects.take('INCREMENT_ASYNC')
    yield effects.call(delay, 1000) // <- さっきはここまで実行した
    yield effects.put({type: 'INCREMENT'}) // <- ここまで実行される
  }
}

take エフェクトや put エフェクトの場合と同様に、 runEffect を通じて runPutEffect が呼び出される。

Reduxへのディスパッチ

runPutEffect は以下のようになっている。

// src/internal/proc.js
function runPutEffect({action, resolve}, cb) {
  // Reduxのdispatchに引数を渡す
  let result = dispatch(action);

  // コールバック関数にその結果を渡す
  return cb(result)
}

引数に渡された {type: 'INCREMENT'} をそのままReduxのディスパッチャに渡す。 これにより counter reducer が動き、カウンタの値がインクリメントされる。

その後、cb に代入された next を呼び、counterSaga の実行を継続する。 counterSagaは以下のように定義され、effects.take('INCREMENT_ASYNC') までループし、これまでと同様の処理が続いていく。

// counter.js
function* counterSaga() {
  while(1) {
    yield effects.take('INCREMENT_ASYNC') // <- ここまで戻る
    yield effects.call(delay, 1000)
    yield effects.put({type: 'INCREMENT'}) // <- さっきはここまで実行した
  }
}

✅まとめ

簡単なシーケンス図にまとめると以下のようになる。 直列的に実行されるように書かれている counterSaga の処理が何度も中断され、条件が満たされるたびに実行が再開されている。

f:id:mzp:20170507223441p:plain

このようにredux-sagaではタスクの切り替えを制御することで、特定のアクションが来るのを待ったり、プロミスの完了を待つなどの処理を、直接形式で書けるようにしている。