松花皮蛋的黑板报
  • 分享在京东工作的技术感悟,还有JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的

扫一扫
关注公众号

Flux反应式编程结合多线程实现任务编排

博客首页文章列表 松花皮蛋me 2019-03-12 18:51

一、任务编排

如果仅仅是批量执行一段命令、脚本,是不能称之为自动化运维的。运维过程中,涉及到一些复杂、需要日常重复性的工作,则可以通过任务编排来处理。任务编排可以将复杂的作业节点编排成任务,设定触发条件和时间,满足更为灵活的应用场景

二、反应式编程

任务实体

import lombok.Data;
import java.util.List;

@Data
public class Task {
    private Integer id;
    private Integer jobId;
    private String name;
    private String command;
    private Integer order;
    private String failHandle = "break";
    private List<String> requires;
    private Boolean preArgs = true;
}

假设一个job中需要运行多个task,task间有order、requires依赖关系,传统编程模式中,最容易想到的就是回调编程,但是如果要达到效果将导致多层回调内嵌,维护成本和可读性极差

ExecutorService executor = Executors.newFixedThreadPool(1);
    ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
    ListenableFuture<String> lf = listeningExecutor.submit(new Callable<String>()  {
        @Override
        public String call() {

        }
    });
    Futures.addCallback(lf, new FutureCallback<String>()  {
        @Override
        public void onFailure(Throwable t) {

        }
        @Override
        public void onSuccess(String s) {
        }
    });

随着反应式编程(Reactive Programming)这种新的编程范式的出现,有了新的解决方案。

我们一般通过迭代器(Iterator)模式来遍历一个序列,这种遍历方式是由调用者来控制节奏的,采用的是拉的方式,每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行

官方的例子

userService.getFavorites(userId) ➊
           .flatMap(favoriteService::getDetails)  ➋
           .switchIfEmpty(suggestionService.getSuggestions())  ➌
           .take(5)  ➍
           .publishOn(UiUtils.uiThreadScheduler())  ➎
           .subscribe(uiList::show, UiUtils::errorPopup);  ➏

➊ 根据用户ID获得喜欢的信息(打开一个 Publisher)
➋ 使用 flatMap 操作获得详情信息
➌ 使用 switchIfEmpty 操作,在没有喜欢数据的情况下,采用系统推荐的方式获得
➍ 取前五个
➎ 在 uiThread 上进行发布
➏ 最终的消费行为

三、代码实现

主方法:

@SpringBootApplication
public class WorkflowApplication implements CommandLineRunner {

public static void main(String[] args) {
    SpringApplication.run(WorkflowApplication.class, args);

}

static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);

@Autowired
private TaskSubcriber taskSubcriber;


@Override
public void run(String... args) throws Exception {
    List taskList = new ArrayList();
    Task task1 = new Task();
    task1.setName("task1");
    task1.setCommand("command1");
    task1.setOrder(1);

    Task task2 = new Task();
    task2.setName("task2");
    task2.setCommand("command2");
    task2.setRequires(Arrays.asList("task1"));
    task2.setOrder(2);

    Task task3 = new Task();
    task3.setName("task3");
    task3.setCommand("command3");
    task3.setOrder(3);

    taskList.add(task1);
    taskList.add(task2);
    taskList.add(task3);

    BaseService baseService = new BaseService();
    ExecutorService executor = Executors.newFixedThreadPool(3);
    executor.execute(new Runnable() {
        @Override
        public void run() {
            Flux.fromArray(taskList.toArray()).filter(o -> {
                return baseService.requireFilter((Task) o,true);
            }).subscribe(taskSubcriber);
        }
    });
    executor.execute(new Runnable() {
        @Override
        public void run() {
            Flux.interval(Duration.ofSeconds(0),Duration.ofSeconds(0)).fromArray(taskList.toArray()).filter(o -> {
                return baseService.requireFilter((Task) o,false);
            }).subscribe(taskSubcriber);
        }
    });


}


}

消费层

@Component
public class TaskSubcriber implements Subscriber<Object> {

private static final Logger logger = LoggerFactory.getLogger(TaskSubcriber.class);

private static Object lock = new Object();

@Override
public void onSubscribe(Subscription subscription) {
    logger.info("onSubscribe subscription:{}",subscription);
    subscription.request(Integer.MAX_VALUE);
}

@Override
public void onNext(Object object) {
    logger.info("onNext list:{}",object);
    Task task = (Task) object;
    if(task.getRequires()==null) {
        NoRelyTaskService noRelyTaskService = new NoRelyTaskService(lock,task);
        Thread thread = new Thread(noRelyTaskService);
        thread.start();
      } else {
        RelyTaskService relyTaskService = new RelyTaskService(lock,task);
        Thread thread = new Thread(relyTaskService);
        thread.start();
    }
}

@Override
public void onError(Throwable throwable) {
    throwable.printStackTrace();
}

@Override
public void onComplete() {
    logger.info("onComplete :{}");
}
}

不依赖其他任务的任务执行层

public class NoRelyTaskService extends BaseService implements   Runnable {

private  Object lock = null;
private Task task;

public NoRelyTaskService(Object l,Task t)
{
    lock = l;
    task = t;
}

@Override
public void run()
{
    synchronized (lock) {
        System.out.println("线程:"+Thread.currentThread().getName()+",命令:"+task.getCommand());
        lock.notifyAll();
    }

}
}   

依赖其他任务的任务执行层

public class RelyTaskService extends BaseService implements     Runnable{

private  Object lock = null;
private Task task;

public RelyTaskService(Object l,Task t)
{
    lock = l;
    task = t;
}

@Override
public void  run()
{
    synchronized (lock) {
        try {
            List<String> requires = task.getRequires();
            while (state(requires)==false) {
                System.out.println("线程"+Thread.currentThread().getName()+"开始等待");
                lock.wait();
            }
            System.out.println("线程:"+Thread.currentThread().getName()+",命令:"+task.getCommand());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


}

四、多线程需要注意的地方

1、初学者理解wait()的时候都认为是将当前线程阻塞,所以Thread.currentThread().wait();视乎很有道理。但是不知道大家有没有发现,在JDK类库中wait()和notify()方法并不是Thread类的,而是Object()中的。在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,当前线程等待

2、始终使用while循环来调用wait方法,永远不要在循环外调用wait方法,这样做的原因是尽管并不满足条件,但是由于其他线程调用notifyAll方法会导致被阻塞线程意外唤醒,此时执行条件不满足,它会导致约束失效

3、唤醒线程,应该使用notify还是notifyAll?notify会随机通知等待队列中的一个线程,而notifyAll会通知等待队列中所有线程,可知notify是有风险的 ,可能导致某些线程永远不会被通知到

4、当前线程必须拥有此对象监视器,然后才可以放弃对此监视器的所有权并等待 ,直到其他线程通过调用notify方法或notifyAll方法通知在此对象的监视器上等待的线程醒来,然后该线程将等到重新获得对监视器的所有权后才能继续执行。否则会报IllegalMonitorStateException 错误