Azure Cosmos DB EmulatorをJavaで操作する(実装編)

前回の記事でAzure Cosmos DB EmulatorにJavaから接続するための設定が完了したので、今回は実際にAzure Cosmos DB Emulatorを操作する処理を実装していく。

Azure Cosmos DB EmulatorにJavaから接続するための設定は以下の記事を参照 olafnosuke.hatenablog.com


azure-spring-data-cosmosを使用する。

依存関係追加

build.gradleに依存関係を追加する。
spring-cloud-azure-starter-data-cosmosを追加することで、azure-spring-data-cosmosも依存関係に追加される。
しかし、azure-spring-data-cosmosは最新バージョンを用いないと、ページャーありの検索を行う場合にかなりの時間がかかってしまう問題があるため、別途依存関係を追加することでバージョンの上書きを行っている。

dependencies {
    implementation 'com.azure.spring:spring-cloud-azure-starter-data-cosmos:4.3.0'
    implementation 'com.azure:azure-spring-data-cosmos:3.26.0'
}

application.ymlに接続先情報を設定する

以下の記述例では、直接ymlに設定値を記述しているが、環境変数から値を取得しても良い。

spring:
  cloud:
    azure:
      cosmos:
        endpoint: https://localhost:8081
        key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==

エンティティクラスの作成

import org.springframework.data.annotation.Id;

import com.azure.spring.data.cosmos.core.mapping.Container;
import com.azure.spring.data.cosmos.core.mapping.PartitionKey;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;

@Container(containerName = "mail_history")
public class MailHistory {

    /** ID */
    @Id
    private String id;

    /** メールアドレス */
    private String mail;

    /** 配信ステータス */
    private String status;

    /** 件名 */
    private String subject;

    /** 配信種別 */
    private String deliveryType;

    /** 配信日時 */
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
    private LocalDateTime deliveryDateTime;
    
    // getter, setter省略
}

リポジトリクラスの作成

com.azure.spring.data.cosmos.repository.CosmosRepositoryを継承する。
総称型には、エンティティクラスとIDの型を設定する。

import org.springframework.stereotype.Repository;
import com.azure.spring.data.cosmos.repository.CosmosRepository;

@Repository
public interface MailHistoryRepository extends CosmosRepository<MailHistory, String> {
}

ResponseDiagnosticsProcessorの実装

クエリの実行結果のRU値などをログ出力するために実装する。
以下はサンプルなので標準出力している。

import com.azure.spring.data.cosmos.core.ResponseDiagnostics;
import com.azure.spring.data.cosmos.core.ResponseDiagnostics.CosmosResponseStatistics;
import com.azure.spring.data.cosmos.core.ResponseDiagnosticsProcessor;

public class SampleResponseDiagnosticsProcessor implements ResponseDiagnosticsProcessor {

    /**
     * {@inheritDoc}
     */
    @Override
    public void processResponseDiagnostics(ResponseDiagnostics responseDiagnostics) {
        CosmosResponseStatistics statistics = responseDiagnostics.getCosmosResponseStatistics();
        System.out.println("◇◇" + responseDiagnostics);

        if (statistics != null) {
            System.out.println("◆◆" + statistics.getRequestCharge());
        }
    }
}

Configurationクラスの作成

com.azure.spring.data.cosmos.config.AbstractCosmosConfigurationを実装する。
getDatabaseName()メソッドの実装と、cosmosConfig()メソッドのオーバーライドを行う。
CosmosConfigの設定で、先ほど作成したResponseDiagnosticsProcessorの実装クラスの設定を行う。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.azure.spring.data.cosmos.config.AbstractCosmosConfiguration;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.repository.config.EnableCosmosRepositories;

/**
 * CosmosDB操作に必要なクラスを定義するためのConfigurationクラス。<br>
 */
@Configuration
@EnableCosmosRepositories
public class SampleConfiguration extends AbstractCosmosConfiguration {

    /**
     * {@inheritDoc}
     */
    @Override
    protected String getDatabaseName() {
        return "Test";
    }

    /**
     * {@inheritDoc}
     */
    @Bean
    @Override
    public CosmosConfig cosmosConfig() {
        return CosmosConfig.builder()
                .enableQueryMetrics(true)
                .responseDiagnosticsProcessor(new SampleResponseDiagnosticsProcessor())
                .build();
    }
}

DB操作処理の記述

@SpringBootApplication
public class SampleCosmosApplication implements CommandLineRunner {

    @Autowired
    private MailHistoryRepository repo;

    public static void main(String[] args) {
        SpringApplication.run(SampleCosmosApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        MailHistory mailHistory = new MailHistory();
        mailHistory.setId("0000");
        mailHistory.setMail("a".repeat(256));
        mailHistory.setStatus("02");
        mailHistory.setSubject("あ".repeat(512));
        mailHistory.setDeliveryType("01");
        mailHistory.setService("カ".repeat(30));
        mailHistory.setMailMagazine("カ".repeat(30));
        mailHistory.setDeliveryDateTime(LocalDateTime.now());

        // 1件登録
        repo.save(mailHistory);

        List<MailHistory> list = new ArrayList<>();
        Random random = new Random();
        for (int i = 1; i <= 200000; i++) {
            MailHistory history = new MailHistory();
            history.setId(Integer.toString(i));
            history.setMail("sample%d@co.jp".formatted(i));
            history.setStatus(Integer.toString(random.nextInt(6)));
            history.setDeliveryType(Integer.toString(random.nextInt(3)));

            if (i % 2 == 0) {
                    history.setService("◯◯サービス");
                    history.setSubject("キャンペーン通知");
                    history.setMailMagazine("キャンペーンメール");
                    history.setDeliveryDateTime(LocalDateTime.of(2022, 10, 20, 10, 20, 30, 500));
            } else {
                    history.setService("△△システム");
                    history.setSubject("ログイン通知");
                    history.setMailMagazine("ログイン通知メール");
                    history.setDeliveryDateTime(LocalDateTime.of(2022, 5, 10, 15, 20, 25, 500));
            }
            list.add(history);
        }

        // 複数件登録
        repo.saveAll(list);

        // ID検索
        Optional<MailHistory> findById = repo.findById("00");
        MailHistory mailHistory = findById.get();
        System.out.println(mailHistory);

        // ページャーあり全件検索
        Sort sort = Sort.by(Direction.ASC, "id");
        PageRequest page = PageRequest.of(0, 100, sort);
        Page<MailHistory> all = repo.findAll(page);
    }
}

日付データの登録について

日付データを登録する場合は、エンティティクラスの日付クラスのフィールドに以下のようにアノテーションを付与する。

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;

@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private LocalDateTime deliveryDateTime;

アノテーションが付与されていない場合は以下のようなデータが登録されてしまう。

上記のアノテーションを付与することで、@JsonFormatに記載したフォーマットで登録されるようになる。


複数件登録について

CosmosRepositoryにデフォルトで用意されているsaveAllメソッドの実装は以下の通りであり、saveメソッドを複数回実行しているのみで、バルクインサートではない。(8万件くらいを登録するのに30分近くかかる)

@Override
public <S extends T> Iterable<S> saveAll(Iterable<S> entities) {
    Assert.notNull(entities, "Iterable entities should not be null");

    final List<S> savedEntities = new ArrayList<>();
    entities.forEach(entity -> {
        final S savedEntity = this.save(entity);
        savedEntities.add(savedEntity);
    });

    return savedEntities;
}

また、CosmosRepositoryにはバルクインサート用のメソッドは特に用意されていない。
バルクインサートを行いたい場合はcom.azure.spring.data.cosmos.core.CosmosTemplateクラスのrunQuery()メソッドに対して、自身で作成したクエリを指定してを呼び出すか、SDKのクライアントを使用することになる。


ページャーありの全件検索メソッドについて

CosmosRepositoryに用意されてるページャー機能ありの全件検索メソッドであるfindAll(Pageable)は、最初のページの検索であれば検索の実行自体は早いが、検索結果の全件数を取得するのがかなり遅い。
遅くなっている原因として、件数取得のカウントにもソート条件を設定していることにあった。

これはissuesにも挙がっており、対応策としてcom.azure.spring.data.cosmos.core.CosmosTemplateクラスにメソッドが追加されている。

以下に追加されたメソッドを使用した、検索結果の全件数取得が遅い問題への対応策を記述する。

CosmosTemplateクラスを継承したクラスの作成

作成したクラスで、ページャー機能ありの検索を行うpaginationQuery(CosmosQuery, Class<T>, String)メソッドをオーバーライドする。

/**
 * {@link CosmosTemplate}の拡張クラス。<br>
 */
public class SampleCosmosTemplate extends CosmosTemplate {

    /**
     * コンストラクタ。<br>
     * 
     * @param cosmosFactory
     *            {@link CosmosFactory}
     * @param cosmosConfig
     *            {@link CosmosConfig}
     * @param mappingCosmosConverter
     *            {@link MappingCosmosConverter}
     * @param cosmosAuditingHandler
     *            {@link IsNewAwareAuditingHandler}
     */
    public SampleCosmosTemplate(CosmosFactory cosmosFactory, CosmosConfig cosmosConfig,
            MappingCosmosConverter mappingCosmosConverter, IsNewAwareAuditingHandler cosmosAuditingHandler) {
        super(cosmosFactory, cosmosConfig, mappingCosmosConverter, cosmosAuditingHandler);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> Page<T> paginationQuery(CosmosQuery query, Class<T> domainType, String containerName) {
        // 検索条件がある場合はコンストラクタの引数に追加する
        final SqlQuerySpec countQuerySpec = new SqlQuerySpec("SELECT VALUE COUNT(1) FROM r");
        // 検索の実行
        Slice<T> response = sliceQuery(query, domainType, containerName);
    
        PageRequest page = PageRequest.of(0, 1, Sort.unsorted());
          // 検索結果の全件数取得 こっちのページは0を指定する
        Slice<Long> slice = runSliceQuery(countQuerySpec, page, domainType, Long.class);
        return new CosmosPageImpl<>(response.getContent(), response.getPageable(), slice.getContent().get(0));
    }
}

Configurationクラスへの設定追加

上記で作成したCosmosTemplateが使用されるように設定を追加する。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;

import com.azure.spring.data.cosmos.Constants;
import com.azure.spring.data.cosmos.CosmosFactory;
import com.azure.spring.data.cosmos.config.AbstractCosmosConfiguration;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.core.CosmosTemplate;
import com.azure.spring.data.cosmos.core.convert.MappingCosmosConverter;
import com.azure.spring.data.cosmos.repository.config.EnableCosmosRepositories;

/**
 * CosmosDB操作に必要なクラスを定義するためのConfigurationクラス。<br>
 */
@Configuration
@EnableCosmosRepositories
public class SampleConfiguration extends AbstractCosmosConfiguration {

    @Qualifier(Constants.AUDITING_HANDLER_BEAN_NAME)
    @Autowired(required = false)
    private IsNewAwareAuditingHandler cosmosAuditingHandler;

    /**
     * {@inheritDoc}
     */
    @Override
    protected String getDatabaseName() {
        return "Test";
    }

    /**
     * {@inheritDoc}
     */
    @Bean
    @Override
    public CosmosConfig cosmosConfig() {
        return CosmosConfig.builder()
                .enableQueryMetrics(true)
                .responseDiagnosticsProcessor(new SampleResponseDiagnosticsProcessor())
                .build();
    }

    /**
     * {@inheritDoc}
     */
    @Bean
    @Override
    public CosmosTemplate cosmosTemplate(CosmosFactory cosmosFactory,
            CosmosConfig cosmosConfig,
            MappingCosmosConverter mappingCosmosConverter) {
        return new SampleCosmosTemplate(cosmosFactory, cosmosConfig, mappingCosmosConverter, cosmosAuditingHandler);
    }
}

SDKのクラスを使用

azure-spring-data-cosmosも内部でSDKを使用しているので、SDKのクラスを直接使用してCosmosDBのデータを操作することもできる。

DB操作処理の記述

以下のサンプルではcom.azure.cosmos.CosmosClientをDIしているが、com.azure.cosmos.CosmosClientBuilderクラスを使用してインスタンスを作成することもできる。

import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;

@SpringBootApplication
public class SampleCosmosApplication implements CommandLineRunner {

    @Autowired
    private CosmosClient client;

    public static void main(String[] args) {
        SpringApplication.run(SampleCosmosApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        CosmosDatabase database = client.getDatabase("Test");
        CosmosContainer container = database.getContainer("performance_200000");

        MailHistory mailHistory = new MailHistory();
        mailHistory.setId("0000");
        mailHistory.setMail("a".repeat(256));
        mailHistory.setStatus("02");
        mailHistory.setSubject("あ".repeat(512));
        mailHistory.setDeliveryType("01");
        mailHistory.setService("カ".repeat(30));
        mailHistory.setMailMagazine("カ".repeat(30));
        mailHistory.setDeliveryDateTime(LocalDateTime.now());

        ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();
        String valueAsString = mapper.writeValueAsString(mailHistory);
        ObjectNode cosmosObjectNode = (ObjectNode) mapper.readTree(valueAsString);

        // 1件登録
        CosmosItemResponse<T> result = container.createItem(cosmosObjectNode);
        // メソッドの戻り値からRU値を取得可能
        log.info("RU:{}", result.getRequestCharge());

        List<MailHistory> entities = new ArrayList<>();
        Random random = new Random();
        for (int i = 1; i <= 100000; i++) {
            MailHistory history = new MailHistory();
            history.setId(Integer.toString(i));
            history.setMail("sample%d@co.jp".formatted(i));
            history.setStatus(Integer.toString(random.nextInt(6)));
            history.setDeliveryType(Integer.toString(random.nextInt(3)));

            history.setService("◯◯サービス");
            history.setSubject("キャンペーン通知");
            history.setMailMagazine("キャンペーンメール");
            history.setDeliveryDateTime(LocalDateTime.of(2022, 10, 20, 10, 20, 30, 500));

            entities.add(history);
        }

        ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();
        List<CosmosItemOperation> list = entities
                .stream()
                .map(entity -> {
                    String valueAsString;
                    ObjectNode cosmosObjectNode = null;
                    try {
                        valueAsString = mapper.writeValueAsString(entity);
                        cosmosObjectNode = (ObjectNode) mapper.readTree(valueAsString);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }

                    return CosmosBulkOperations.getCreateItemOperation(cosmosObjectNode,
                            new PartitionKey(entity.getService()));
                })
                .toList();

        // バルクインサート
        Iterable<CosmosBulkOperationResponse<Object>> result = container.executeBulkOperations(list);
        Iterator<CosmosBulkOperationResponse<Object>> iterator = result.iterator();

        double ru = 0;
        while (iterator.hasNext()) {
            CosmosBulkOperationResponse<Object> response = iterator.next();
            ru += response.getResponse().getRequestCharge();
        }
        System.out.println("RU:" + ru);
    }
}

CosmosClientBuilderを使用したCosmosClientの作成

CosmosClient client = new CosmosClientBuilder()
        .endpoint(endpoint)
        .key(key)
        .buildClient();

日付データの登録について

日付データを含むエンティティはデフォルトでは登録できない。
日付データを登録する場合は、エンティティをObjectNodeに変換してから登録処理を行う。

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

void sample() {
    MailHistory mailHistory = new MailHistory();
    mailHistory.setDeliveryDateTime(LocalDateTime.now());

    ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();
    String valueAsString = mapper.writeValueAsString(mailHistory);
    ObjectNode cosmosObjectNode = (ObjectNode) mapper.readTree(valueAsString);

    // 1件登録
    CosmosItemResponse<T> result = container.createItem(cosmosObjectNode);
}