Skip to content
Snippets Groups Projects
HashServiceImpl.java 2.21 KiB
Newer Older
项升's avatar
项升 committed
package com.aliware.tianchi;

郭浩's avatar
郭浩 committed
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
项升's avatar
项升 committed
import java.util.concurrent.ThreadLocalRandom;
郭浩's avatar
郭浩 committed
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.dubbo.common.utils.NamedThreadFactory;
项升's avatar
项升 committed

/**
郭浩's avatar
郭浩 committed
 * Facade
项升's avatar
项升 committed
 *
 * @author guohaoice@gmail.com
 */
public class HashServiceImpl implements HashInterface {
郭浩's avatar
郭浩 committed
  private final String salt;
  private final AtomicBoolean init = new AtomicBoolean(false);
  private final List<ThrashConfig> configs;
  private volatile ThrashConfig config;
  private ScheduledExecutorService scheduler =
      new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Benchmark-refresh-permit"));
郭浩's avatar
郭浩 committed
  public HashServiceImpl(String salt) {
      this(salt,Collections.emptyList());
  }
项升's avatar
项升 committed

郭浩's avatar
郭浩 committed
  public HashServiceImpl(String salt, List<ThrashConfig> configs) {
项升's avatar
项升 committed
    this.salt = salt;
郭浩's avatar
郭浩 committed
    this.config = ThrashConfig.INIT_CONFIG;
    this.configs = Collections.unmodifiableList(configs);
项升's avatar
项升 committed
  }

  @Override
  public int hash(String input) {
郭浩's avatar
郭浩 committed
    if (!init.get()) {
      if (init.compareAndSet(false, true)) {
        int startTime = 0;
        for (ThrashConfig thrashConfig : configs) {
          scheduler.schedule(
              () -> refresh(thrashConfig), startTime + config.durationInMs, TimeUnit.MILLISECONDS);
          startTime += config.durationInMs;
        }
      }
    }
    Semaphore permit = config.permit;
项升's avatar
项升 committed
    try {
郭浩's avatar
郭浩 committed
      permit.acquire();
      long baseRtt = nextRTT();
项升's avatar
项升 committed
      Thread.sleep(baseRtt);
郭浩's avatar
郭浩 committed
      return (input + salt).hashCode();
项升's avatar
项升 committed
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
郭浩's avatar
郭浩 committed
    } finally {
      permit.release();
项升's avatar
项升 committed
    }
郭浩's avatar
郭浩 committed
    throw new IllegalStateException("Unexpected exception");
  }

  private void refresh(ThrashConfig thrashConfig) {
    this.config = thrashConfig;
项升's avatar
项升 committed
  }

  private long nextRTT() {
    ThreadLocalRandom rng = ThreadLocalRandom.current();
    double u = rng.nextDouble();
    int x = 0;
    double cdf = 0;
    while (u >= cdf) {
      x++;
郭浩's avatar
郭浩 committed
      cdf = 1 - Math.exp(-1.0D * 1 / config.averageRTTInMs * x);
项升's avatar
项升 committed
    }
    return x;
  }
}