Temporal简单实例


目的

学习temporal基本概念和用法

前言

公司2个项目用到了temporal,之前收藏的一些temporal相关文章一直没看,今天好好学习了一下并写一个简单的demo

什么是temporal

Temporal是一种分布式、可扩展、持久且高度可用的编排引擎,用于以可扩展和弹性的方式执行一步长时间运行的业务逻辑。

Temporal Go SDK是使用Go语言创作工作流活动的框架。本文将使用temporal GO SDK实现一个简单的demo

temporal原理

由四部分组成Start、Temporal Server、Worker、Bank

  • Start:工作流的创建者/发起者。可以将不同的Activity(也就是Worker实现的具体的执行逻辑模块,每一个Activity都有一个名字)组织成一个Workflow,并且开启workflow。
  • Temporal Server:存储所有工作流的数据、状态的中间件,整个工作依赖于该server(后续简写为TS)
  • Worker:实际进行逻辑处理的执行者。该模块实现具体的执行逻辑,并且在启动的时候注册到TS。
  • Bank:官方给的示例,可以理解为DB,TS的存储模块。

具体的流程描述:
1.启动Temporal Server。
2.启动Temporal Client,也就是Worder,监听TS,循环获取待执行的工作流。
3.Start创建一个工作流,封装参数,调用sdk的api(rpc)发送到TS。Worker拉取到工作流开始逻辑处理

woker是Activity和Workflow的包装,worker的唯一工作就是执行Activity和Workflow并将结果返回给TS。
一个Wokeflow包含多个Activity,对Activity进行编排,多个Activity可以并行,也可以同步(阻塞到都某个Activity执行完毕)。其底层会阻塞到Future.Get()方法上。

Activity一定是在worker里面的,因为逻辑的具体执行者一定是worker。workflow不一定要封装在worker里,可以单独拿出来,实现Activity的编排,worker只管一个一个Activity的具体实现即可。

启动Temporal Service

本地docker启动

$ git clone https://github.com/temporalio/docker-compose.git 
$ cd docker-compose 
$ docker-compose up

使用temporal的命令行工具与本地TS交互

$ alias tctl="docker exec temporal-admin-tools tctl" 
$ tctl namespace list 
$ tctl workflow list

网页版交互

http://localhost:8088

upload successful

简单实现

代码结构

├── activity
│   └── activity.go // 实现具体的逻辑
├── entity
│   └── student.go // 这里定义了一个实体
├── go.mod
├── go.sum
├── main.go   // Start:启动一个工作流
├── pkg
│   └── workflow
│       ├── worker_client.go // 对worker的封装。封装注册Activity,注册Workflow
│       └── workflow_client.go // 对workflow封装。开始工作流
├── worker
│   └── worker.go // worker的启动入口:注册activity,workflow,启动监听Temporal Service
└── workflow
    └── workflow.go // 实现一个工作流。里面实现了对activity的编排、调用

源代码
temporal

如果没有权限可评论区联系我。

开启一个任务栏

➜  temporal_demo git:(master) go run main.go
2022/07/30 20:03:57 INFO  No logger configured for temporal client. Created default one.
ID:bfd5cd35-b621-4037-b7c8-8ac5c9850410, RunID:c3a636d3-f750-4da4-b237-470fb5bcb3b1

开启一个worker

➜  temporal_demo git:(master) go run worker/worker.go
2022/07/30 20:03:55 INFO  No logger configured for temporal client. Created default one.
2022/07/30 20:03:55 INFO  Started Worker Namespace default TaskQueue abc-test WorkerID 85630@xxxx-Pro.local@
2022/07/30 20:03:55 WARN  Failed to process workflow task. Namespace default TaskQueue abc-test WorkerID 85630@xxxx-Pro.local@ WorkflowType PrintActivity WorkflowID jdfdjsifj RunID 5b90f072-0a8d-4a1f-b694-0f4e06f562e7 Attempt 118 Error unable to find workflow type: PrintActivity. Supported types: [HandleStudent]
2022/07/30 20:03:57 DEBUG ExecuteActivity Namespace default TaskQueue abc-test WorkerID 85630@xxxx-Pro.local@ WorkflowType HandleStudent WorkflowID bfd5cd35-b621-4037-b7c8-8ac5c9850410 RunID c3a636d3-f750-4da4-b237-470fb5bcb3b1 Attempt 1 ActivityID 5 ActivityType PrintActivity1
this is PrintActivity1 func. student: map[Age:18 Name:jobs]
2022/07/30 20:03:57 DEBUG ExecuteActivity Namespace default TaskQueue abc-test WorkerID 85630@xxxx-Pro.local@ WorkflowType HandleStudent WorkflowID bfd5cd35-b621-4037-b7c8-8ac5c9850410 RunID c3a636d3-f750-4da4-b237-470fb5bcb3b1 Attempt 1 ActivityID 11 ActivityType PrintActivity2

发现worker执行了这个任务流

网页查看

upload successful

注:如果启动多个worker,则同一个工作流中的多个activity可能在不同的worker中执行。

参考

[1]Temporal框架学习

[2]Documentation

[3]https://github.com/temporalio/temporal

[4]一种实用的 Temporal 架构方法

[5]Workflows 工作流

[6]源代码


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录