用Semaphore解决LeetCode并发练习题

Semaphore

Semaphore不像是内部锁那样要求你在调用wait,notify之前要先拥有锁。信号量的方法对被哪个线程调用没有限制,任何线程都可调用Semaphore的acquire, release,只是信号数量如果不够的话那么线程会在调用acquire时被block而已。另外,一个信号量可以用0初始化,通过release(), release(n)调用给它添加可用信号的数量。

比如,下面的代码通过release(3)给信号量添加了3个许可。我自己心里总是把一个信号隐喻到一个许可证。就应付这几道练习题来说,这种理解似乎还能自圆其说。

Semaphore semaphore = new Semaphore(0);
semaphore.release(3);
int i = semaphore.availablePermits();
System.out.println(“i = “ + i);

利用信号量协调线程的执行其核心是通过在业务逻辑的(当然是由不同线程驱动的)进口、出口来调节信号/许可的数量。

利用下面的知识点应该就足够团灭这几道题了。

  1. 线程只有通过acquire(n)拿到所需信号才能继续执行,如果信号数量不够就会被阻塞。

  2. 调用release(n)会释放指定数量的信号。释放信号的线程不必是之前acquire信号的线程。

  3. 因此一般的套路是:对于需要首先在某线程执行的逻辑我们可以初始化适量的信号,而对需要阻塞的线程则把信号初始化为0.

4)某个线程执行完一步,就释放信号给下一步让下一步的逻辑可以获得许可在线程中运行起来。如此递推下去。

LeetCode并发专栏的六道题都有多种解法,都可以用信号量刷一遍。

下面的练习我把Semaphore变量都命名成了permission来强调这种基于信号的允许、禁止的感觉。

LeetCode 1114, 多线程按序打印1,2,3。

public class FooBySemaphore {

    public FooBySemaphore() {}

    // 先打印 one, 所以这个初始化为放行。
    private Semaphore firstPermission = new Semaphore(1);
    // 初始化 0许可,在任务的不同阶段 ‘添加许可’ 以 ‘放行线程’
    private Semaphore secondPermission = new Semaphore(0);
    private Semaphore thirdPermission = new Semaphore(0);

    public void first(Runnable printFirst) throws InterruptedException {
        firstPermission.acquire();
        printFirst.run();
        secondPermission.release(); // 放行第二个线程
    }

    public void second(Runnable printSecond) throws InterruptedException {
        secondPermission.acquire();
        printSecond.run();
        thirdPermission.release(); // 放行第三个线程
    }

    public void third(Runnable printThird) throws InterruptedException {
        thirdPermission.acquire();
        printThird.run();
        firstPermission.release(); // 放行第一个线程
    }
}

LeetCode 1117, 多线程生成水分子。

public class H2OBySemaphore {
    /**
     * 1个O原子需要2个H原子,就是说,O不能被生产直到有两个H原子被生产出来。
     *
     * 可以定义一个信号量 oSemaphore,每当一个氢原子被生产出来,就把这个信号量加1。
     * 生产出2个氢原子后oSemaphore的信号数量就是2。
     * 而生产O原子的线程需要被阻塞,直到能够 acquire 到两个信号(对应到有2个氢原子了)。
     *
     * 同样,氢原子不能被生产,直到有一个氧原子被产生出来。
     */
    public H2OBySemaphore() {
    }

    private Semaphore hPermission = new Semaphore(2); //_ 这个设置是让氢原子有优先权先生产。
    private Semaphore oPermission = new Semaphore(0); //_ 如果是把这个信号量初始化为2,就是让氧原子先生产。
    public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
        hPermission.acquire();
        releaseHydrogen.run();
        oPermission.release();
    }

    public void oxygen(Runnable releaseOxygen) throws InterruptedException {
        oPermission.acquire(2);

        releaseOxygen.run();
        hPermission.release(2);
    }
}

LeetCode 1226, 哲学家就餐问题。

public class DiningPhilosophers {

    private Semaphore[] forksPermission = new Semaphore[5];

    public DiningPhilosophers() {
        for (int i = 0; i < 5; i ++) {
            forksPermission[i] = new Semaphore(1);
        }
    }

    // call the run() method of any runnable to execute its code
    public void wantsToEat(int philosopher,
                           Runnable pickLeftFork,
                           Runnable pickRightFork,
                           Runnable eat,
                           Runnable putLeftFork,
                           Runnable putRightFork) throws InterruptedException {

        int leftFork = philosopher;
        //_ 右边的叉子编号应该比左边的小1,为了防止溢出 (5 + philosopher - 1) % 5
        int rightFork = (philosopher + 4) % 5;
        while (true) {
            // 尝试拿左叉,再拿右叉。如果同时抢左右则有死锁可能性。
            if (forksPermission[leftFork].tryAcquire()) { 
                // pickLeftFork.run(); //_ 其实应该这样更有道理,拿到左叉许可就应可执行。
                // 拿到左叉,尝试拿右叉。
                if (forksPermission[rightFork].tryAcquire()) {
                    // 左叉,右叉都拿到了。开吃! 
                    pickLeftFork.run();
                    pickRightFork.run();
                    
                    eat.run();

                    putLeftFork.run();
                    forksPermission[leftFork].release();
                    putRightFork.run();
                    forksPermission[rightFork].release();
                    
                    break; // 吃完退出(线程)
                } else {
                    // 抢到左叉却没拿到右叉,则释放左叉给别人。自己也再重新抢。
                    forksPermission[leftFork].release();
                    Thread.sleep(1); //_ 为什么必须sleep?
                }
            } else {
                // 没抢到左叉,歇歇继续试。
                Thread.sleep(1);  //_ 为什么必须sleep?
            }
        }
    }
}

锁这个词太有意思了,有几个意思?

读了第15章,大致感觉到了CAS的乐观锁特性。锁这个词太有意思了,有几个意思?

并发(多线程)编程的挑战之一就是解决对共享数据的竞争读写。各种Synchronizer的实现构成了解决这个问题的工具箱。
从更一般的、更笼统的意义上讲锁跟synchronizer其实一个意思,都可以看作是协调线程、控制线程的对象。这里所谓的更一般是说,我们可以把CountDownLatch、Semphore这些不带Locak字眼的Synchronizer也化分到锁这个概念里。

这样把概念统一到锁上之后,我们就可以把锁/Synchronizer分成耳熟能详的两类了:悲观锁、乐观锁。
对于乐观锁正如数据库事务解决方案里提到的乐观锁的概念,核心就是与观察旧值是否有变化而决定是否成功修改。
所以乐观锁想成基于“compare and set”的算法更好。更进一步把锁想成算法就好,具体的门锁那种感觉有点二狭隘了。
从最根本的提供“锁“算法实现的CPU指令上讲:对于乐观锁是CAS(LL, SC)提供了最核心的支持,对于悲观锁(对应到OS的互斥量,java的thread都一一映射到OS的thread)是lock指令前缀。除了这些CPU指令还有混存一致性协议这样的技术一起向上层提供了实现各种Syncrhonizer(悲观锁/乐观锁)的最基础支持。更详细内容可以参考文末的引用链接。

基于CAS实现的乐观锁其实不牵扯到什么类似mutex这样具体的锁,把它当作一种synchronizer就好(怎么有点儿金刚经的感觉)。我们可以利用CAS指令实现了非阻塞的多线程协调工作的各种算法。比如,列用CAS实现非阻塞计数器、非阻塞栈、非阻塞队列。JDK中的ConcurrentLinkedQueue就是利用原子化域更新器实现的支持并发(多线程)操作的非阻塞数据结构。

基于CAS cpu指令的有ABA问题。而load-linked/store-condition这样的cpu指令可以避免ABA问题。
从API的角度,可以用AtomicMarkableReference, AtomicStampedReference避免CAS下的ABA。
https://en.wikipedia.org/wiki/ABA_problem。

References:
《Java并发编程实践》
https://www.ibm.com/developerworks/cn/java/j-jtp04186/

0、https://blog.csdn.net/Saintyyu/article/details/94493694
1、https://blog.csdn.net/zacklin/article/details/7445442 原子操作与 x86 上的 lock 指令前缀
2、https://www.cnblogs.com/xrq730/p/7048693.html 就是要你懂Java中volatile关键字实现原理
3、https://blog.csdn.net/u011244446/article/details/52574369 Linux 互斥锁、原子操作实现原理
4、https://www.jianshu.com/p/6745203ae1fe 关于volatile、MESI、内存屏障、#Lock
5、https://www.jianshu.com/p/61490effab35 操作系统中锁的原理
6、https://www.cnblogs.com/XiaoHDeBlog/p/3740270.html Linux中同步互斥机制研究之原子操作
7、https://blog.csdn.net/Saintyyu/article/details/100838503 cas vs mutex

About Lock and Synchronizer

引子:说AbstractQueuedSynchronizer(AQS)是构建锁和Synchronizer的框架。锁,好像大家都知道,至少自以为都知道 :)。那什么是synchronizer呢? Lock和Synchronize是什么关系?

Synchronizer,它是一个根据自身状态调节线程执行的对象。就是用来协调(多)线程执行的对象。从这个角度讲Java的内置锁就是一种synchronizer,它以互斥的可重入的方式协调/控制线程的执行。
Java的阻塞队列也是一种synchronizser。
再比如:信号量Semaphore、闭锁Latch、关卡Barrier都是不同类型的synchronizer。

信号量

Semaphore可以用来控制同时访问某资源的线程数量。把对这些资源的获取操作包装起来,获取资源前先调用信号量的acquire()申请许可,资源使用使用完毕后通过release()释放许可。对池化资源的管理一般可用信号量完成。

闭锁

英文Latch有门闩的意思,门闩就是用来把门关紧不让出入。在并发编程里也是这个意思,我们用latch这个对象禁止线程的执行,什么时候允许线程通过这个门闩呢?对于CountDownLatch来说,就是计数变为0的时候。对于FutureTask来说就是可以拿到计算结果的时候(当然也可能是计算异常了)。一个应用场景:可以把闭锁当作一个发令枪,它可以让线程等到信号后一起运行。

关卡

Barrier中文就是“障碍物、栅栏”的意思,文绉绉的翻译就是“关卡”。它其实像极了闭锁,与闭锁不同的是:闭锁等待的是事件,而关卡等待的是线程。从API调用的感觉感觉上讲,Barrier是在工作线程正儿八经的工作都执行完毕后(取决于业务场景),调用barrier.await()使工作线程阻塞住,直到所有其它工作线程也都完成个各自的任务并都调用了barrier.await(),这个时候关卡就被冲破。这里等待的条件是有线程调用barrier.await()。而Latch是在工作线程中调用latch.await(),等待闭锁被开启来执行。闭锁是怎样被开启的?它是依靠latch.countDown()到0后被开启的,谁countdown跟哪个线程没什么直接关系。而barrier.await()是需要实实在在的线程阻塞,这就是为什么说barrier等待的是线程,latch等的是信号。
经常使用关卡到场景是,一个任务分成n个子任务后,等待这n个子任务完成后再做下一步工作。
CyclicBarrier的构造函数接受一个线程数以及一个关卡被突破后要执行的动作。
Exchanger是另一种关卡,用来为两个线程交换数据,当然是以线程安全的方式交换。

Reference:
https://web.mit.edu/6.005/www/fa15/classes/23-locks/
《Java并发编程实践》
https://en.wikipedia.org/wiki/Synchronization_(computer_science)

Ordered and Ordering

Java关于比较有两个基础接口:Comparable,Comparator。
其中Comparatable接口只定义了一个方法

public int compareTo(T o);

而Comparator相对复杂:

int compare(T o1, T o2);
thenComparing() ... 

Scala与之对应的比较是Ordered,Ordering。
Ordered类似Comparable,并多出

https://www.scala-lang.org/api/current/scala/math/Ordering.html

This trait and scala.math.Ordered both provide this same functionality, but in different ways. A type T can be given a single way to order itself by extending Ordered. Using Ordering, this same type may be sorted in many other ways. Ordered and Ordering both provide implicits allowing them to be used interchangeably.

未来的邮箱:Promise

按照Erik的介绍,我们可以把Promise当作是装Future的邮箱/容器。
可从它的两个方法签名可以体会下,你可以向邮箱里放成功的数据或者失败的Exception。

def successful[T](result: T): Promise[T]
def failed[T](exception: Throwable): Promise[T]

放了值之后,可以调用Promise的future() 得到一个已经完成了的Future。
Promise最核心的就是这个逻辑:你可以通过Promise.future()得到一个Future对象,而future里的计算结果是在什么其它地方(当然一般情况是在其它线程里、回调代码里)计算好放进去的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val p = Promise[Int]()
val f = p.future

def produce() = Future {
Thread.sleep(500)
p.success(1)
println("Produce done")
}

def consume() = Future {
f.foreach(r => println(s"Get $r"))
println(s"Consume done")
}

produce()
consume()
StdIn.readLine("End?\n")

这段代码的打印如下内容,可以看到consume()方法已经执行完了才打印出“Get 1”。

Consume done
End?
Produce done
Get 1

这个也好理解,foreach只是针对future Success的情况提供了callback机制。需要注意的是Future可以通过onComplete, foreach注册多个callback,但是这些callback运行先后以及运行所在线程是没有保证的。这点区别于map、flatMap。

本来想试着用Promise实现 List[Future[T]] 到 Future[List[T]]的转换,找到foldLeft这种方式。如下:

1
2
3
def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
fts.foldLeft(Future{ List.empty[T]})((acc, ft) => acc.flatMap(ts => ft.map(t => ts :+ t)))
}

画蛇添足地再体验下promise:

1
2
3
4
5
6
7
def sequenceByPromise[T](fts: List[Future[T]]): Future[List[T]] = {
val p = Promise[List[T]]()
val result = p.success(List.empty[T]).future

fts.foldLeft(result)((acc, ft) => acc.flatMap(ts => ft.map(t => ts :+ t)))
result
}

下面代码演示了如何把callback风格代码转为Future风格。在Akka actor框架里,如果需要异步的执行代码并且后面的代码需要这个异步执行的结果,我们就可以通过Promise把结果封装到Future里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

trait CallbackBasedApi {
def computeIntAsync(continuation: Try[Int] => Unit): Unit
}

trait FutureBasedApi {
def computeIntAsync(): Future[Int]
}

def futurize(callbackBasedApi: CallbackBasedApi): FutureBasedApi = {
val p = Promise[Int]()

// 体会下把“Try=>Unit”当做complete的参数。
callbackBasedApi.computeIntAsync( t => p.complete(t))

new FutureBasedApi {
def computeIntAsync() = p.future
}
}

Reference:
https://docs.scala-lang.org/overviews/core/futures.html

Try Future sequence

这两天重新看了点儿Erik Meijer讲Try和Future,自己对他所讲内容没有什么违和感了,蛮开心的。

1)关于Option[T], Either[E, R] 和 Try[T]的使用场景。

这三种type很容易让人想到处理Exception的场景。这些types如果只是针对Exception就略显狭隘了。现在我的感觉是:
1)Option适于处理业务逻辑上需要空值的地方,这里不一定是因为Exception导致。往往是业务上需要表达这种“空”/“没值”。
2)Either的左值不一定是Exception,表示一个计算可能有两种结果比较好,右值按照惯例表示正确/正常路径下的结果。左值是另个分支的结果。当然,也可以放Exception,Error什么的。STTP的Response body部分就是一个Either[Error, T]。
3)Try,其实才是最适合表示一个计算可能出现Exception的type。Try的apply()接受的就是一个代码块并运行,对异常封装到子类Failure。
最后的感觉是Option,Either更像标量,是结果的一个静态表示。而Try是动态的,包含了代码的执行及对结果的封装。看Try的定义体会下:

1
2
3
4
5
6
object Try {
def apply[T](r: => T): Try[T] =
try Success(r) catch {
case NonFatal(e) => Failure(e)
}
}

2)Future’s sequence()的实现。

1)这是Erik喜欢的递归方式的实现。
其中两个flatMap都是Future上的flatMap。

1
2
3
4
5
6
def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
fts match {
case Nil => Future(Nil)
case ft::fts => ft.flatMap( t => sequence(fts).flatMap(ts => Future(t::ts)))
}
}

2)通过类型推导,我发现把flatMap()换成了map()也符合类型检查,似乎也也没有大的问题。

1
2
3
4
5
6
def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
fts match {
case Nil => Future(Nil)
case ft::fts => ft.flatMap( t => sequence(fts).map(ts => t::ts))
}
}

3)Erik通过async,await实现的sequence。
这种方式似乎更容易理解,但风格太不FP了。Erik警告说,如果是基于Future编程,那么不要wait。但是在async块里除外,因为async本身是异步的所以不会阻塞。另外,async/await在模块scala-async里,需要加到sbt的依赖里。

1
2
3
4
5
6
7
8
9
10
import scala.async.Async.{async, await}
def sequence[T](fs: List[Future[T]]): Future[List[T]] = async {
var _fs = fs
val result = ListBuffer[T]()
while (_fs != Nil) {
result += await { _fs.head }
_fs = _fs.tail
}
result.toList
}

4)还可以通过Promise来实现。
等磕完promise再说吧。。。

STTP的基本使用(3):把List[Future[Either[Error, T]]]转为Future[(List[Throwable], List[T])]

如果使用 AkkaHttp 作为 STTP 的 backend 来并发地处理 list of url,就会得到类似 List[Future[Response[Either[ResponseError[io.circe.Error], T]]]],这样的结果。

一般地,我们更期望给上层调用者返回 Future[(List[Exception], List[T])]这样的类型。

下面代码演示了如何把:
List[Future[Response[Either[ResponseError[io.circe.Error], NasaData]]]]
准换为
Future[(List[Throwable], List[NasaData])]。
其中 tuple 的第一部分表示 response 中所有的 exception,第二部分表示所有正常数据。

几个技术关键点是:
1)Future.sequence 用于把 List[Future] 转为 Future[List]。
2)忽略 STTP Response 里除去 body 数据的其它部分,并把 response 转为 Either[Throwable, NasaData]。
3)经过步骤 1,2,数据类型已经是 Future[List[Either[Throwable, NasaData]]]。
4)List[Either[Throwable, NasaData]] 转为 (List[Throwable], List[NasaData])的思路是:
a) 构造一个空的 Tuple (List[Throwable(), ListNasaData)作为一个累计器。
b) 使用 List.folderLeft()遍历元素,根据是 left 还是 right 累加到累计器的_1 或_2。

下面链接的文章演示了两种转换方式,一种是 Scala 原生手写,一种是使用 CAT。 https://pbassiner.github.io/blog/composing_future,_list_and_either.html。
但是,其中语句 case Left(err) => acc.left.map(_ => err) 的逻辑似乎与期望不符。如果 list of either 中有 Left,并不能工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
val erA: Either[String, Int] = Right(1)
val erB: Either[String, Int] = Right(2)
val erC: Either[String, Int] = Right(3)
val elA: Either[String, Int] = Left("error1")
val elB: Either[String, Int] = Left("error2")
val listOfEither = List(erA, elA, erB, erC, elB)
private val eitherOfList: Either[String, List[Int]] = listOfEither.foldLeft(Right[String, List[Int]](List()).asInstanceOf[Either[String, List[Int]]])((acc, elem) => {
elem match {
case Right(v) => acc.map(l => l :+ v)
case Left(e) => acc.left.map(_ + e)
}
})
println(s"eitherOfList: $eitherOfList")
输出为:eitherOfList: Right(List(1, 2, 3))

下面的代码演示了并发发出100个RestAPi请求获取数据的处理代码,结果保存在Tuple里,分别是所有失败和成功的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import io.circe.generic.auto._
import sttp.client._
import sttp.client.akkahttp.AkkaHttpBackend
import sttp.client.circe._

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global

val NASA_API_KEY = "XXXXXXXXXXXXXXX"
val baseUrl = s"https://api.nasa.gov/neo/rest/v1/neo/browse?api_key=${NASA_API_KEY}"

var pageSize = 10
var pageNumb = 100

case class Links(self: String, next: String)

case class Page(number: Int, size: Int, total_elements: Int, total_pages: Int)

case class NearEarthObject(absolute_magnitude_h: Double, designation: String)

case class NasaData(links: Links, page: Page, near_earth_objects: List[NearEarthObject])

implicit val sttpBackend = AkkaHttpBackend()

// Note: List[Future[Response[Either[ResponseError[io.circe.Error], NasaData]]]] ---> Future[Either[String, List[NasaData]]]

val listOfFutureResult: List[Future[Response[Either[ResponseError[io.circe.Error], NasaData]]]] =
(0 until pageNumb).toList.map(pageIndex => {
val pageUrl = s"http://www.neowsapp.com/rest/v1/neo/browse?page=$pageIndex&size=$pageSize&api_key=${NASA_API_KEY}"
basicRequest.get(uri"$pageUrl")
.response(asJson[NasaData])
.send()
})

// Note: Future[List] --> List[Future]
val futureOfList: Future[List[Response[Either[ResponseError[io.circe.Error], NasaData]]]] = Future.sequence(listOfFutureResult)

// Note: 把Response[Either[ResponseError[io.circe.Error], NasaData]] 转为了 Either[Throwable, NasaData]
val respOfEither2Either: Response[Either[ResponseError[io.circe.Error], NasaData]] => Either[Throwable, NasaData] = resp => {
println(s"resp==> code:${resp.code.code} ${resp.statusText}")
val newEither: Either[Throwable, NasaData] = resp.body match {
case Left(respErr) => Left(respErr.getCause)
case Right(nasaData) => Right(nasaData)
}
newEither
}

val futureOfListOfEither: Future[List[Either[Throwable, NasaData]]] = futureOfList.map(listOfResp => {
listOfResp.map(respOfEither2Either)
})

val listOfEither2TupleOfList: List[Either[Throwable, NasaData]] => (List[Throwable], List[NasaData]) = listE => {
listE.foldLeft(List[Throwable](), List[NasaData]())((acc, eData) => {
eData match {
case Right(nasaData) => (acc._1, acc._2 :+ nasaData)
case Left(t) => (acc._1 :+ t, acc._2)
}
})
}

val futureOfEitherOfList = futureOfListOfEither map listOfEither2TupleOfList

val result = Await.result(futureOfEitherOfList, 1.minute)
println(s"Exceptions in Response: $result._1")
println(s"Data in Response: $result._2")

另,Akka HTTP默认主机连接持是32个,所以需要修改applicaiton.confg配置以支持更多连接数。
akka.http.host-connection-pool.max-open-requests = 128

STTP的基本使用(2):Json

1)对 Json 的支持

对 request,response 消息体中 JSON 的支持一般就是要做两件事:一是定义 josn 的格式规范,另外就是根据格式规范进行序列化、反序列化。STTP 提供开箱即用的对第三方 JOSN 库的支持:包括 Circe、Json4s、spray-json 等。
目前,从项目活跃度、维护程度上讲 Circe 应该是不错的选择。(https://scala.libhunt.com/compare-circe-vs-spray-json)

2)Circe

Circe 竟然是基于挑战智商的 CAT 实现的!官网: https://circe.github.io/circe/。
Circe 使用 Encoder、Decoder 编解码 josn。Encoder[A]把 A 转为 Json,Decoder[A]则把 Josn 转为 A 或者 Exception(如果失败话)。Circe 对 Scala 库中常见类型就提供了默认的隐式实例。
对于简单结构的 case class,使用 Circe 提供的自动、半自动的编解码就好。Circe 当然也支持自定义转换或对 ADT 类型数据转换。智商够用可以去看https://github.com/milessabin/shapeless。
使用 circe-optics 模块还可以对 Json 数据进行遍历、修改。
比如,下面从 order 中获取用户的电话号码。

1
val phoneNum = root.order.customer.contactDetails.phone.string

如下是调用 NASA 某 Api 并解析代码示例, 是不是太方便了?!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import io.circe.generic.auto._
import sttp.client._
import sttp.client.circe._

case class Links(self: String, next: String)
case class Page(number: Int, size: Int, total_elements: Int, total_pages: Int)
case class NearEarthObject(absolute_magnitude_h: Double, designation: String)
case class NasaData(links: Links, page: Page, near_earth_objects: List[NearEarthObject])

implicit val backend = HttpURLConnectionBackend()
val resp = basicRequest
.get(uri"https://api.nasa.gov/neo/rest/v1/neo/browse?api_key=${NASA_API_KEY}")
.response(asJson[NasaData])
.send()

resp.body match {
case Left(failure) => println(failure)
case Right(data) => {
println("Get response from NAS Api:")
println(data)
}
}

3)Resilience 弹性

STTP本身不提供Resilience方面的支持,可以借助上层的第三方库(Future、Monix、Akka、Resilientce4J)实现诸如:重试、断路、限流这样的功能。

STTP的基本使用(1):Request、Response

试试看 get 一下新技能:STTP with AKKA。

1)定义 request。
请求由不可变的数据结构 RequestT 来表示,其值可以由 sttp.client.clientRequest 来表示,并可通过它提供的各种方法(cookie, body, responseAs…)来细力度的来设定 reqeust 对象的数据(包括返回的 response 格式)。

2)发送请求。
为了发送请求,这里需要一个隐式对象 backend。核心的绝大部分工作都发生在 backend。比如把请求转为特定 backend 格式、打开 HTTP 连接、发送接收数据、把接收到的数据转为 STTP 的 response 格式等。
Backend 管理着连接池、处理 response 的线程池,根据 backend 的情况还支持 streaming 和 websockets。
请求可以同步发送,返回值类型为 Response[T]。
请求也可以异步发送,返回值类型为 Future[Response[T]]。Monix 这种 backend 的返回值则是 Task[Resonse[T]]。

关于更详细的如何使用 request,比如设置 cookie、认证、form、上传文件、proxy 等,可参考:https://sttp.readthedocs.io/en/latest/requests/basics.html

如何使用 AKKA 这样支持 streaming 的 backend,参考:
发送流:https://sttp.readthedocs.io/en/latest/requests/streaming.html
接收流:https://sttp.readthedocs.io/en/latest/responses/body.html#streaming

3)Responses
Response 是 case class Response[T]的一个实例,其中 T 是 response body 的 type。
如果 connection 出现问题,在同步的 backend 情况下 exception 会被返回,对于异步来说就是 failed future。 详见:https://sttp.readthedocs.io/en/latest/responses/exceptions.html
使用.body: T 方法来获取 response body。

Reference:
https://sttp.readthedocs.io/en/v2.0.0-rc13/requests/type.html

JMM

下图列出了重排序造成的内存可见性问题,以及 Java 针对这些问题的解决方案“Happens-Before”。