package com.example;

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import java.io.BufferedWriter;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.time.Instant;


public class Example implements HttpFunction  {
  public static void run() throws Exception {
    final String projectId = "david-playground-1";
    final String topicId = "test";
    publishWithOrderingKeysExample(projectId, topicId);
  }
  
  @Override
  public void service(HttpRequest request, HttpResponse response) throws Exception {
    run();
    BufferedWriter writer = response.getWriter();
    writer.write("Sent!");
  }

  public static void publishWithOrderingKeysExample(final String projectId, final String topicId)
      throws IOException, InterruptedException {
    final TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    final Publisher publisher =
        Publisher.newBuilder(topicName)
            .setEndpoint("europe-west1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();


    final String orderingKey = String.valueOf(Instant.now().toEpochMilli());
    final String jsonString = String.format("{\"a\":\"a\",\"b\":\"b\",\"c\":\"c\",\"send_time\":%s}", orderingKey);
    final ByteString data = ByteString.copyFromUtf8(jsonString);
    final PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder().setData(data).setOrderingKey(orderingKey).build();
    try {
      final ApiFuture<String> future = publisher.publish(pubsubMessage);
      // Add an asynchronous callback to handle publish success / failure.
      ApiFutures.addCallback(
          future,
          new ApiFutureCallback<String>() {

            @Override
            public void onFailure(Throwable throwable) {
              if (throwable instanceof ApiException) {
                ApiException apiException = ((ApiException) throwable);
                // Details on the API exception.
                System.out.println(apiException.getStatusCode().getCode());
                System.out.println(apiException.isRetryable());
              }
              System.out.println("Error publishing message : " + pubsubMessage.getData());
            }

            @Override
            public void onSuccess(String messageId) {
              // Once published, returns server-assigned message ids (unique within the topic).
              System.out.println(pubsubMessage.getData() + " : " + messageId);
            }
          },
          MoreExecutors.directExecutor());
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}
