侧边栏壁纸
博主头像
春潮带雨晚来急博主等级

行动起来,活在当下

  • 累计撰写 32 篇文章
  • 累计创建 1 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

lambda2

Administrator
2023-12-26 / 0 评论 / 0 点赞 / 126 阅读 / 13277 字
> SpringBoot2.0不容错过的新特性 WebFlux响应式编程

##### 1.流概念
```
public class StreamDemo1 {
    public static void main(String[] args) {
        int[] nums = {1, 2, 3};
        // 外部迭代
        int sum = 0;
        for (int i : nums) {
            sum += i;
        }
        System.out.println("结果为:" + sum);

        //使用stream的内部迭代
        int sum1 = IntStream.of(nums).sum();
        System.out.println("结果为:" + sum1);


        // map就是中间操作(返回stream的操作)
        // sum 就是终止操作
        int sum2 = IntStream.of(nums).map(i -> i * 2).sum();
        System.out.println("结果为:" + sum2);

        // 正常求值 ( 会执行StreamDemo1 方法)
        int sum3 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
        // 惰性求值 ( 这面的这一行代码,都不会执行的)
        IntStream intStream = IntStream.of(nums).map(StreamDemo1::doubleNum);
        System.out.println("惰性求值就是 终止操作 没有调用的情况下 , 中间操作不会执行!");
    }

    public static int doubleNum(int i) {
        System.out.println("执行了乘以2");
        return i * 2;
    }
}
```

##### 2.流创建
开始方式 | 相关方法
---|---
集合 | Collection.stream/parallelStream
数组 | Arrays.stream
数字Stream 1| IntStream/LongStream.range/rangeClosed
数字Stream 2|Random.ints/longs/doubles
自己创建 | Stream.generate/iterate

```
public class StreamDemo2 {
    public static void main(String[] args) {
        ArrayList<String> list = new ArrayList<>();

        // 从集合创建
        list.stream();
        list.parallelStream();


        // 从数组创建
        Arrays.stream(new int[]{1, 3, 5});


        // 创建数字流
        IntStream.of(1, 2, 3);
        IntStream.rangeClosed(1, 10);  // 创建了一个 1到10之间的数字流

        // 使用random创建一个无限流 (limit是断路操作,无限的边界)
        new Random().ints().limit(10);
        Random random = new Random();

        // 自己产生流
        Stream.generate(() -> random.nextInt()).limit(20);

    }
}
```
###### 中间操作

开始方式 | 相关方法
---|---
无状态操作 1| map/mapToXxx
无状态操作 2| flatMap/flatMapToXxx
无状态操作 3| filter
无状态操作 4|peek
无状态操作 5| unordered
有状态操作 1| distinct
有状态操作 2| sorted
有状态操作 3| limit / skip

-   无状态表示与其他前后没有任何关系。有状态表示需要其他的值来做出相应的判断等
    -   eg:排序,是需要其他的都得出了一个最终结果后才能进行排序
-   它们公有的特性是,都是返回的一个String,并且可以继续链式编程下去。

```
1.map/mapToXxx

map: 把A对象转成B对象
mapToInt: 把一个对象转换成一个数字(一般都是获取这个对象的属性。比如传入一个字符串,返回这个字符串的长度[String -> int])

2. flatMap/flatMapToXxx (拉平)
eg: 对象A里面有个属性B, B是一个集合 。那么,flatMap 是可以获取所有A对象的所有B属性的列表

3. filter

4. peek (入参是消费者),与终止操作里面的forEach很像[peek是中间操作,forEach是终止操作]

5. unordered是在并行流的时候会用到
```
eg:
```
public class StreamDemo3 {
    public static void main(String[] args) {
        String str = "my name is 007";

        /** 把每个单词的长度调用出来 **/
        Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);

        /** flatMap A-B属性(是个集合),最终得到所有的A元素里面的所有B属性集合 **/
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed());
        /** intStream/longStream 并不是Stream的子类,所以要进行装箱 boxed **/
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.println((char) i.intValue()));

        /** peek 用户debug.是个中间操作,forEach是终止操作 **/
        Stream.of(str.split(" ")).peek(System.out::println);

        /** limit 使用,主要用于无限流 **/
        new Random().ints().forEach(System.out::println);
        new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);
    }
}

```
##### 流的终止操作

开始方式 | 相关方法
---|---
非短路操作| forEach/forEachOrdered
非短路操作| collect/toArray
非短路操作| reduce
非短路操作| min/max/count
短路操作| findFirst/findAny
短路操作| allMatch/anyMatch/noneMatch

-   短路操作:eg findFirst 查找一个数,只要找到了这个数,那么这个流就可以结束。
-   非短路操作:   不到操作或者遍历全部完成,是不会停止这个流的操作的

```
public class StreamDemo4 {
    public static void main(String[] args) {
        String str = "my name is 007";
        /** ************************ forEach   forEachOrdered ******************************************* **/
        /** 使用并行流 **/
        str.chars().parallel().forEach(i -> System.out.print((char) i));
        System.out.println();
        /** 使用 forEachOrdered 保证顺序 **/
        str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
        System.out.println();

        /** *************************** collect  to Array********************************************** **/

        /** 收集到list **/
        List<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());
        System.out.println(collect);

        /** ********************************** reduce ************************************ **/
        /** 把一个流,合成一个数据 || 拼接字符串**/  /**  这种字符串的拼接返回一个Optional,可以调用Optional的方法来操作 **/
        Optional<String> reduce = Stream.of(str.split("")).reduce((s1, s2) -> s1 + "|" + s2);
        System.out.println(reduce.orElse(""));

        /** 带初始化值的reduce 是直接返回一个String **/
        String string = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2);
        System.out.println(string);

        /** 带初始值的其他操作;eg : 计算所有单词的总长度**/
        Integer length = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (s1, s2) -> s1 + s2);
        System.out.println(length);

        /** ************************************** min  max  count ********************************** **/
        /** 返回出集合中单词长度最长的 **/
        Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());
        System.out.println(max.get());

        /** 短路操作----------------------------------------------------------------- **/
        OptionalInt first = new Random().ints().findFirst();
        System.out.println(first.getAsInt());
    }
}

```

##### 并行流
```
package stream;

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

/**
 * Created By  醉美柳舞之众星捧月
 *
 * @author song
 * @date 2018/12/27 17:03
 */
public class StreamDemo5 {
    public static void main(String[] args) {

        /** 1. 常规普通 并行流  **/
//        /** 调用parallel 产生一个 并行流 **/
//        IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();


        /**  2. 先并行 再串行的场景 **/
        // 现在要实现一个效果:先并行,再串行
        // 多次调用 parallel / sequential , 以最后一次调用为准
        // IntStream.range(1,100);
        //调用parallel产生并行流
        // .parallel().peek(StreamDemo5::debug)
        // sequential 产生串行流
        // .sequential().peek(StreamDemo5::debug2)
        // .count();

        /** 3. 使用线程, 默认是 系统默认的线程 **/

        /** 并行流使用的线程池: ForkJoinPool.commonPool **/
        /** 默认的线程数是 当前机器的CPU个数 **/
        /** 使用这个属性可以修改默认的线程数 **/
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");


        /**
         * 4. 自定义选择线程池
         */

        /** 并行流式用的系统默认的同一个线程池 **/
        /** 使用自己的线程池,不使用默认线程池,防止任务被阻塞 **/
        ForkJoinPool pool = new ForkJoinPool(20);
        pool.submit(() -> IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count());
        pool.shutdown();
        synchronized (pool) {
            try {
                pool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void debug(int i) {
        System.out.println("debug" + i);
        System.out.println(Thread.currentThread().getName() + "debug" + i);

    }
}

```
##### 收集器
-   统计
-   分块
-   分组
```
package stream;

/** Created By  
 * 醉美柳舞之众星捧月 
 * @author song
 * @date 2019/1/8 19:50
 */

/**
 * Author song
 * Date & Time  2019/1/8 19:56
 * Description  收集器  
 */

import java.util.*;
import java.util.stream.Collectors;

/** 学生 对象 */
class Student {
    /** 姓名 */
    private String name;

    /** 年龄 */
    private int age;

    /** 性别 */
    private Gender gender;

    /** 班级 */
    private Grade grade;

    public Student(String name, int age, Gender gender, Grade grade) {
        super();
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.grade = grade;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Grade getGrade() {
        return grade;
    }

    public void setGrade(Grade grade) {
        this.grade = grade;
    }

    public Gender getGender() {
        return gender;
    }

    public void setGender(Gender gender) {
        this.gender = gender;
    }

    @Override
    public String toString() {
        return "[name=" + name + ", age=" + age + ", gender=" + gender
                + ", grade=" + grade + "]";
    }
}

/** 性别 */
enum Gender {MALE, FEMALE}

/** 班级 */
enum Grade {ONE, TWO, THREE, FOUR;}

public class CollectDemo {

    public static void main(String[] args) {
        // 测试数据
        List<Student> students = Arrays.asList(
                new Student("小明", 10, Gender.MALE, Grade.ONE),
                new Student("大明", 9, Gender.MALE, Grade.THREE),
                new Student("小白", 8, Gender.FEMALE, Grade.TWO),
                new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
                new Student("小红", 7, Gender.FEMALE, Grade.THREE),
                new Student("小黄", 13, Gender.MALE, Grade.ONE),
                new Student("小青", 13, Gender.FEMALE, Grade.THREE),
                new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
                new Student("小王", 6, Gender.MALE, Grade.ONE),
                new Student("小李", 6, Gender.MALE, Grade.ONE),
                new Student("小马", 14, Gender.FEMALE, Grade.FOUR),
                new Student("小刘", 13, Gender.MALE, Grade.FOUR));

        // 得到所有学生的年龄列表
        // s -> s.getAge() --> Student::getAge , 不会多生成一个类似 lambda$0这样的函数
        Set<Integer> ages = students.stream().map(Student::getAge)
                .collect(Collectors.toCollection(TreeSet::new));
        System.out.println("所有学生的年龄:" + ages);

        // 统计汇总信息
        IntSummaryStatistics agesSummaryStatistics = students.stream()
                .collect(Collectors.summarizingInt(Student::getAge));
        System.out.println("年龄汇总信息:" + agesSummaryStatistics);

        // 分块
        Map<Boolean, List<Student>> genders = students.stream().collect(
                Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
        // System.out.println("男女学生列表:" + genders);
//        MapUtils.verbosePrint(System.out, "男女学生列表", genders);

        // 分组
        Map<Grade, List<Student>> grades = students.stream()
                .collect(Collectors.groupingBy(Student::getGrade));
//        MapUtils.verbosePrint(System.out, "学生班级列表", grades);

        // 得到所有班级学生的个数
        Map<Grade, Long> gradesCount = students.stream().collect(Collectors
                .groupingBy(Student::getGrade, Collectors.counting()));
//        MapUtils.verbosePrint(System.out, "班级学生个数列表", gradesCount);
// 如果对某个字段求和则是:Collectors.summingInt(Student::getAge);

    }
}

```
##### Lambda运行机制
-  1 .所有操作是链式调用, 一个元素只迭代一次
-   2 .每一个中间操作返回一个新的流. 流里面有一个属性sourceStage 指向同一个 地方,就是Head
-   3 .Head->nextStage->nextStage->... -> null
-   4 .有状态操作会把无状态操作<code>截断</code>,单独处理
-   5 .并行环境下, 有<code>状态的中间操作</code>不一定能并行操作.
-   6 .parallel/ sequetial这2个操作也是中间操作(也是返回stream)但是他们不创建流, 他们只修改 Head的并行标志
-   0 .个人理解:::lambda的运行,是无数个反复循环的单例。不会所有的一下处理了,再所有一下处理下一个。而是,一条处理了又来一条,这样往返。
-   0 .Stream中传入一个参数的,一般为无状态操作。传入两个参数的,一般都是有状态操作

```
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 验证stream运行机制
 * <p>
 * 1. 所有操作是链式调用, 一个元素只迭代一次
 * 2. 每一个中间操作返回一个新的流. 流里面有一个属性sourceStage
 * 指向同一个 地方,就是Head
 * 3. Head->nextStage->nextStage->... -> null
 * 4. 有状态操作会把无状态操作阶段,单独处理
 * 5. 并行环境下, 有状态的中间操作不一定能并行操作.
 * <p>
 * 6. parallel/ sequetial 这2个操作也是中间操作(也是返回stream)
 * 但是他们不创建流, 他们只修改 Head的并行标志
 *
 * @author 晓风轻
 */
public class RunStream {

    public static void main(String[] args) {
        Random random = new Random();
        // 随机产生数据
        Stream<Integer> stream = Stream.generate(() -> random.nextInt())
                // 产生500个 ( 无限流需要短路操作. )
                .limit(500)
                // 第1个无状态操作
                .peek(s -> print("peek: " + s))
                // 第2个无状态操作
                .filter(s -> {
                    print("filter: " + s);
                    return s > 1000000;
                })
                // 有状态操作
                .sorted((i1, i2) -> {
                    print("排序: " + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })
                // 又一个无状态操作
                .peek(s -> {
                    print("peek2: " + s);
                }).parallel();

        // 终止操作
        stream.count();
    }

    /**
     * 打印日志并sleep 5 毫秒
     *
     * @param s
     */
    public static void print(String s) {
        // System.out.println(s);
        // 带线程名(测试并行情况)
        System.out.println(Thread.currentThread().getName() + " > " + s);
        try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
        }
    }
}
```

##### 小结
1.

-   惰性求值
-   中间操作  - 有状态\无状态
-   终止操作 短路
-   parallel/sequential   不创建流,  而只是一个Head

2.

-   收集器 - 分组/分块/统计

3.运行机制

-   链式 , Head -> nextStage
-   并行 fork/join  阻塞

0

评论区