QueueStorageTriggerの作成(Spring Boot)

Spring Boot でAzure FunctionsのQueueStorageTriggerを実装して、ローカルで動作確認する手順を示す。

プロジェクトの作成方法については、以下の記事参照。

olafnosuke.hatenablog.com

バージョン

Spring Boot:2.7.5
Java:17

ファンクションを新規追加する

ファンクションを新規追加する場合は、HandlerクラスとFunctionクラスをそれぞれ追加する。
この時、Handlerクラスはorg.springframework.cloud.function.adapter.azure.FunctionInvoker<I, O>を継承し、Functionクラスはjava.util.function.Function<T, R>を実装して作成する。
各ファンクションでDIしたいbeanがある場合はConfigurationクラスを作成する。

Configクラス作成

ファンクションクラスをbean登録するためのConfigクラスを作成する。 その他処理を行うにあたって必要なクラスがある場合は記述する。

// Configクラス
@Configuration
public class QueueStorageConfiguration {
    @Bean
    public QueueStorageFunction queueStorage() {
        return new QueueStorageFunction();
    }
}

Functionクラス作成

Functionクラスには、ファンクションごとに行いたい処理を記述する。

// Functionクラス
public class QueueStorageFunction implements Function<String, String> {
    public String apply(String s) {
        return s;
    }
}

Handlerクラス作成

Handlerクラスにはファンクションを呼び出す処理を記述する。

// Handlerクラス
public class QueueStorageHandler extends FunctionInvoker<String, String> {

    // Functionクラスのbean登録名とHandlerクラスの@FunctionName()の属性に
    // 指定した名前は一致させる
    @FunctionName("queueStorage")
    public void execute(
        @QueueTrigger(name = "message", queueName = "Storage", connection = "AzureWebJobsStorage") String message,
        final ExecutionContext context) {

        handleRequest(message, context);
    }
}

トリガーの設定

トリガーの設定はアノテーションで行う。@FunctionName("queueStorage")を付与したメソッドのパラメーターにcom.microsoft.azure.functions.annotation.QueueTriggerを付与する。
アノテーションの属性については以下の通り。

属性名 説明
name 関数シグネチャのパラメーター名を入力する。 関数がトリガーされると、このパラメーターの値にはキューメッセージの内容が含められる。必須
queueName ストレージ アカウントのキュー名を入力する。必須
connection ストレージ アカウントの接続文字列を入力する。必須

ファンクションを呼び出す処理

O FunctionInvoker<I, O>.handleRequest(I, ExecutionContext)メソッドでファンクションを呼び出すことが出来る。

// 記述例
String response = handleRequest(timerinfo, context);

ローカルでの動作確認

QueueStorageTriggerの起動には、Azure Blob Storageが必要。
今回は、ローカル環境で使用できるAzure Blob StorageのエミュレータであるAzuriteを使用する例を示す。

Azuriteの導入

コマンドプロンプトで以下のコマンドを実行する。

npm install -g azurite

インストール完了後にc:\\azuriteディレクトリを作成し、以下のコマンドでAzuriteを起動する

azurite --silent --location c:\\azurite --debug c:\\azurite\\debug.log

起動すると以下のようなログが表示される

C:>azurite --silent --location c:\\azurite --debug c:\\azurite\\debug.log
Azurite Blob service is starting at http://127.0.0.1:10000
Azurite Blob service is successfully listening at http://127.0.0.1:10000
Azurite Queue service is starting at http://127.0.0.1:10001
Azurite Queue service is successfully listening at http://127.0.0.1:10001
Azurite Table service is starting at http://127.0.0.1:10002
Azurite Table service is successfully listening at http://127.0.0.1:10002

起動中のAzuriteはCtrl + cで停止できる。

ファンクションの設定

ファンクションプロジェクトのlocal.settings.jsonを開き、AzureWebJobsStorageUseDevelopmentStorage=trueと設定する。

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "MAIN_CLASS":"jp.co.sample.functions.Application"
  }
}

ファンクションの処理の中でAzureBlobStorageに対して処理を行っている場合、以下の接続設定を利用する。 以下はAzuriteの接続設定である。

設定
アカウントキー Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
アカウント名 devstoreaccount1
Blobエンドポイント http://127.0.0.1:10000/devstoreaccount1

Azurite起動中はMicrosoft Azure Storage Explorerの「ローカルで接続済み」->「ストレージアカウント」->「エミュレーター」で起動中のエミュレータの操作をすることも可能。

ファンクション動作確認

ファンクションの作成完了後にプロジェクトをビルドする。

gradlew azureFunctionsPackage

以下のコマンドでAzuriteを起動する

azurite --silent --location c:\\azurite --debug c:\\azurite\\debug.log

ファンクションを起動する。

gradlew azureFunctionsRun

起動に成功すると以下のようにファンクションの一覧が表示される。

> Task :azureFunctionsRun
Azure Function App's staging directory found at: C:\\pleiades-2021-06\\pleiades\\workspace\\hello-spring-function-azure-gradle\\build\\azure-functions\\java-functions

Azure Functions Core Tools
Core Tools Version:       3.0.3734 Commit hash: 61192bb28820be76916f85209916152801483456  (64-bit)
Function Runtime Version: 3.1.4.0

Functions:

        http: [POST] <http://localhost:7071/api/http>

        queue: queueTrigger

起動後にAzure Queue storage メッセージが作成されたタイミングでファンクションが実行される。

メッセージを手動で追加する場合は以下の手順を行う。

メッセージを追加したいコンテナをビューワーに表示して、「メッセージを追加」を押下

開いた子画面に、追加したいメッセージを入力して「OK」を押下

QueueStorageTriggerの設定変更

QueueStorageTriggerの設定を変更したい場合は、ファンクションプロジェクトのhost.jsonの「queues」の設定を編集する。

{
    "version": "2.0",
    "extensions": {
        "queues": {
            "maxPollingInterval": "00:00:02",
            "visibilityTimeout" : "00:00:30",
            "batchSize": 16,
            "maxDequeueCount": 5,
            "newBatchThreshold": 8,
            "messageEncoding": "base64"
        }
    }
}
プロパティ デフォルト値 説明
maxPollingInterval 0:01:00 キューのポーリングの最大間隔
最小間隔は100 ミリ秒
間隔は、maxPollingInterval を上限として指数関数的にインクリメントされる
visibilityTimeout 0:00:00 メッセージの処理が失敗したときの再試行間隔
batchSize 16 ファンクションが同時に取得して、並列で処理するキューメッセージの数
処理中のメッセージの数が newBatchThreshold まで減少すると、設定した数のメッセージを取得し、処理を開始する
1 つのファンクションにつき同時に処理されるメッセージの最大数は、batchSize に newBatchThreshold を加えた値となり、この制限は、キューによってトリガーされる各ファンクションに個別に適用される
maxDequeueCount 5 有害キューに移動する前に、メッセージの処理を試行する回数。
newBatchThreshold N*batchSize/2 同時に処理されているメッセージの数がこの数まで減少すると、新規でキューメッセージを取得する
Nは、App Service または Premiumプランで実行されている場合に使用可能な vCPU の数を表しており、従量課金プランの場合、「1」
messageEncoding base64 メッセージのエンコード形式
この設定は、バージョン 5.0.0 以降の拡張機能のバンドルでのみ使用可能
有効値は base64 もしくは none 

参考:host.json 設定


他のTriggerの作成に関する記事も書いています。

olafnosuke.hatenablog.com

olafnosuke.hatenablog.com

olafnosuke.hatenablog.com