jobserver 源码剖析坑以及实践

准备知识

scala

掌握scala 中的模式匹配,容器, future , option

spray

是一个restful web框架,类似spring boot, 内含spray-can 服务器,spray-routing DSL自定义路由, spray里面还有一些缓存, 底层io, json, web 测试等框架,

akka 以及 akka cluster

actor

Fire-forget

actorRef ! message

Send-And-Receive-Future

implicit val timeout = Timeout(5 seconds)
val future = myActor ? AskNameMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
println(result)

gossip 协议

akka cluster

JOBSERVER 源码剖析

架构图

workflow

API

GET /jars - lists all the jars and the last upload timestamp
POST /jars/<appName> - uploads a new jar under <appName>
GET /contexts - lists all current contexts
POST /contexts/<name> - creates a new context
DELETE /contexts/<name> - stops a context and all jobs running in it
PUT /contexts?reset=reboot - kills all contexts and re-loads only the contexts from config
GET /jobs - Lists the last N jobs
POST /jobs - Starts a new job, use ?sync=true to wait for results
GET /jobs/<jobId> - Gets the result or status of a specific job
DELETE /jobs/<jobId> - Kills the specified job
GET /jobs/<jobId>/config - Gets the job configuration

yarn-cluster

yarn-client

几条主线

第一次启动jobserver

cmd='$SPARK_HOME/bin/spark-submit --class spark.jobserver.JobServer --master yarn-client --driver-memory 50G
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS"
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES"
$@ $appdir/spark-job-server.jar /home/spark/job-server/environment.conf '

spark submit 处理方式

如果是 client方式,直接在本地启动主类,

job-server类走读 main函数

def makeSupervisorSystem(name: String)(config: Config): ActorSystem = {
val configWithRole = config.withValue("akka.cluster.roles",
ConfigValueFactory.fromIterable(List("supervisor").asJava))
ActorSystem(name, configWithRole)
}
start(args, makeSupervisorSystem("JobServer")(_))

创建了一个 actor系统,

start函数中

val clazz = Class.forName(config.getString("spark.jobserver.jobdao"))
val ctor = clazz.getDeclaredConstructor(Class.forName("com.typesafe.config.Config"))
try {
val jobDAO = ctor.newInstance(config).asInstanceOf[JobDAO]
val daoActor = system.actorOf(Props(classOf[JobDAOActor], jobDAO), "dao-manager")
val dataManager = system.actorOf(Props(classOf[DataManagerActor],
new DataFileDAO(config)), "data-manager")
val jarManager = system.actorOf(Props(classOf[JarManager], daoActor), "jar-manager")
val contextPerJvm = config.getBoolean("spark.jobserver.context-per-jvm")
val supervisor =
system.actorOf(Props(if (contextPerJvm) { classOf[AkkaClusterSupervisorActor] }
else { classOf[LocalContextSupervisorActor] }, daoActor),
"context-supervisor")
val jobInfo = system.actorOf(Props(classOf[JobInfoActor], jobDAO, supervisor), "job-info")
// Add initial job JARs, if specified in configuration.
storeInitialJars(config, jarManager)
// Create initial contexts
supervisor ! ContextSupervisor.AddContextsFromConfig
new WebApi(system, config, port, jarManager, dataManager, supervisor, jobInfo).start()
  • 创建了dao类,是用来存取数据的一个trait , 根据配置文件选择具体实现, 文件,postgres, h2数据库, cassadra等
  • 创建了 dao actor响应数据存取的消息
  • dataManager 负责缓存 jar包的actor
  • jarManager 负责从持久化中存取jar的actor,
  • supervisor 总监督者, 负责创建 actor, 根据配置文件,创建 LocalContextSupervisorActor 还是高可用的 AkkaClusterSupervisorActor, AkkaClusterSupervisorActor主要是对每个 context都使用不同的jvm,进行进程隔离
  • 启动 spray web 程序, 把 supervisor 引用传进去作为全局变量

创建context

post {
path(Segment) { (contextName) =>
{
parameterMap { (params) =>
val config = ConfigFactory.parseMap(params.asJava).resolve()
val future = (supervisor ? AddContext(contextName, config))(contextTimeout.seconds)
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
future.map {
case ContextInitialized => ctx.complete(StatusCodes.OK)
case ContextAlreadyExists => badRequest(ctx, "context " + contextName + " exists")
case ContextInitError(e) => ctx.complete(500, errMap(e, "CONTEXT INIT ERROR"))
}
}
}
}
}
} ~

1 客户端调用的http请求先到spray, spray 有监督者也就是 AkkaClusterSupervisorActor 的actor ref, 发送一个AddContext消息

private val cluster = Cluster(context.system)
private val selfAddress = cluster.selfAddress
def wrappedReceive: Receive = {
case AddContext(name, contextConfig) =>
val originator = sender()
startContext(name, mergedConfig, false) { ref =>
originator ! ContextInitialized
} { err =>
originator ! ContextInitError(err)
}

startContext 函数

val pb = Process(cmdString)
val pio = new ProcessIO(_ => (),
stdout => scala.io.Source.fromInputStream(stdout)
.getLines.foreach(println),
stderr => scala.io.Source.fromInputStream(stderr).getLines().foreach(println))
logger.info("Starting to execute sub process {}", pb)
val processStart = Try {
val process = pb.run(pio)
}
if (processStart.isSuccess) {
contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
} else {
failureFunc(processStart.failed.get)
}

2 AkkaClusterSupervisorActor 模式匹配到消息后, 调用 startContext 函数里面获取到 shell脚本文件,读入, 并且调用, 把cluster地址当做参数传进去,为 jobmanager actor加入 cluster做准备

MAIN="spark.jobserver.JobManager"
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory 40G
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS"
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES"
$appdir/spark-job-server.jar $1 $2 $conffile'
val jobManager = system.actorOf(JobManagerActor.props(contextConfig), managerName)
Cluster(system).join(clusterAddress)

3 shell 脚本中 调用 park-submit 以yarn client 模式提交运行主类 spark.jobserver.JobManager, main函数中初始化 jobmanager actor 并且加入集群 cluster

override def preStart(): Unit = {
cluster.join(selfAddress)
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent])
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
cluster.leave(selfAddress)
}
case MemberUp(member) =>
if (member.hasRole("manager")) {
val memberActors = RootActorPath(member.address) / "user" / "*"
context.actorSelection(memberActors) ! Identify(memberActors)
}
case ActorIdentity(memberActors, actorRefOpt) =>
actorRefOpt.map { actorRef =>
val actorName = actorRef.path.name
if (actorName.startsWith("jobManager")) {
logger.info("Received identify response, attempting to initialize context at {}", memberActors)
(for { (isAdHoc, successFunc, failureFunc) <- contextInitInfos.remove(actorName) }
yield {
initContext(actorName, actorRef, contextInitTimeout)(isAdHoc, successFunc, failureFunc)
}).getOrElse({
logger.warn("No initialization or callback found for jobManager actor {}", actorRef.path)
actorRef ! PoisonPill
})
}
}
private def initContext(actorName: String, ref: ActorRef, timeoutSecs: Long = 1)
(isAdHoc: Boolean,
successFunc: ActorRef => Unit,
failureFunc: Throwable => Unit): Unit = {
import akka.pattern.ask
val resultActor = if (isAdHoc) globalResultActor else context.actorOf(Props(classOf[JobResultActor]))
(ref ? JobManagerActor.Initialize(
daoActor, Some(resultActor)))(Timeout(timeoutSecs.second)).onComplete {
case Failure(e:Exception) =>
logger.info("Failed to send initialize message to context " + ref, e)
ref ! PoisonPill
failureFunc(e)
case Success(JobManagerActor.InitError(t)) =>
logger.info("Failed to initialize context " + ref, t)
ref ! PoisonPill
failureFunc(t)
case Success(JobManagerActor.Initialized(ctxName, resActor)) =>
logger.info("SparkContext {} joined", ctxName)
contexts(ctxName) = (ref, resActor)
context.watch(ref)
successFunc(ref)
case _ => logger.info("Failed for unknown reason.")
ref ! PoisonPill
failureFunc(new RuntimeException("Failed for unknown reason."))
}
}

4 AkkaClusterSupervisorActor 的hook中 监听了 cluster的MemberEvent 事件, 有jobmanager加入集群的时候, 会收到消息, 通过 * 匹配, 获取 新加入 jobmanager actor的 ActorSelection, 然后向其发送内置Identify消息, jobmanager actor 会回复自己的引用, 通过这种方法获取jobmanager的 引用, 通过引用发送初始化消息,如果初始化成功,返回成功, 否则毒死它,

case Initialize(dao, resOpt) =>
daoActor = dao
statusActor = context.actorOf(JobStatusActor.props(daoActor))
resultActor = resOpt.getOrElse(context.actorOf(Props[JobResultActor]))
try {
// Load side jars first in case the ContextFactory comes from it
getSideJars(contextConfig).foreach { jarUri =>
jarLoader.addURL(new URL(convertJarUriSparkToJava(jarUri)))
}
jobContext = createContextFromConfig()
sparkEnv = SparkEnv.get
jobCache = new JobCache(jobCacheSize, daoActor, jobContext.sparkContext, jarLoader)
getSideJars(contextConfig).foreach { jarUri => jobContext.sparkContext.addJar(jarUri) }
sender ! Initialized(contextName, resultActor)
} catch {
case t: Throwable =>
logger.error("Failed to create context " + contextName + ", shutting down actor", t)
sender ! InitError(t)
self ! PoisonPill
}
def createContextFromConfig(contextName: String = contextName): ContextLike = {
val factoryClassName = contextConfig.getString("context-factory")
val factoryClass = jarLoader.loadClass(factoryClassName)
val factory = factoryClass.newInstance.asInstanceOf[spark.jobserver.context.SparkContextFactory]
Thread.currentThread.setContextClassLoader(jarLoader)
factory.makeContext(config, contextConfig, contextName)
}
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
val sc = new SparkContext(sparkConf) with ContextLike {
def sparkContext: SparkContext = this
def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SparkJob]
}
for ((k, v) <- SparkJobUtils.getHadoopConfig(config)) sc.hadoopConfiguration.set(k, v)
sc
}

5 新建的jobmanager actor收到初始化消息后, new sparkcontext 类,代表一个 spark context , 里面要dag 划分stage, schedule 调度,运行job

private val contexts = mutable.HashMap.empty[String, (ActorRef, ActorRef)]
case Success(JobManagerActor.Initialized(ctxName, resActor)) =>
logger.info("SparkContext {} joined", ctxName)
contexts(ctxName) = (ref, resActor)

6 AkkaClusterSupervisorActor 的全局变量中保存, jobmanager actor引用, 以备后续使用

  • 客户端调用的http请求先到spray, spray 有监督者也就是 AkkaClusterSupervisorActor 的actor ref, 发送一个AddContext消息
  • AkkaClusterSupervisorActor 模式匹配到消息后, 调用 startContext 函数里面获取到 shell脚本文件,读入, 并且调用, 把cluster地址当做参数传进去,为 jobmanager actor加入 cluster做准备
  • shell 脚本中 调用 park-submit 以yarn client 模式提交运行主类 spark.jobserver.JobManager, main函数中初始化 jobmanager actor 并且加入集群 cluster
  • AkkaClusterSupervisorActor 的hook中 监听了 cluster的MemberEvent 事件, 有jobmanager加入集群的时候, 会收到消息, 通过 * 匹配, 获取 新加入 jobmanager actor的 ActorSelection, 然后向其发送内置Identify消息, jobmanager actor 会回复自己的引用, 通过这种方法获取jobmanager的 引用, 通过引用发送初始化消息,如果初始化成功,返回成功, 否则毒死它,
  • 新建的jobmanager actor收到初始化消息后, new sparkcontext 类,代表一个 spark context , 里面要dag 划分stage, schedule 调度,运行job
  • AkkaClusterSupervisorActor 的全局变量中保存, jobmanager actor引用, 以备后续使用

curl 运行一个job

post {
val jobManager = getJobManagerForContext(contextOpt, contextConfig, classPath)
val future = jobManager.get.ask(
JobManagerActor.StartJob(appName, classPath, jobConfig, events))(timeout)
future.map {
case JobResult(_, res) =>
res match {
case s: Stream[_] => sendStreamingResponse(ctx, ResultChunkSize,
resultToByteIterator(Map.empty, s.toIterator))
case _ => ctx.complete(resultToTable(res))
}
case JobErroredOut(_, _, ex) => ctx.complete(errMap(ex, "ERROR"))
case JobStarted(jobId, context, _) =>
jobInfo ! StoreJobConfig(jobId, postedJobConfig)
ctx.complete(202, Map[String, Any](
StatusKey -> "STARTED",
ResultKey -> Map("jobId" -> jobId, "context" -> context)))
case JobValidationFailed(_, _, ex) =>
ctx.complete(400, errMap(ex, "VALIDATION FAILED"))
case NoSuchApplication => notFound(ctx, "appName " + appName + " not found")
case NoSuchClass => notFound(ctx, "classPath " + classPath + " not found")
case WrongJobType =>
ctx.complete(400, errMap("Invalid job type for this context"))
case JobLoadingError(err) =>
ctx.complete(500, errMap(err, "JOB LOADING FAILED"))
case NoJobSlotsAvailable(maxJobSlots) =>
val errorMsg = "Too many running jobs (" + maxJobSlots.toString +
") for job context '" + contextOpt.getOrElse("ad-hoc") + "'"
ctx.complete(503, Map(StatusKey -> "NO SLOTS AVAILABLE", ResultKey -> errorMsg))
case ContextInitError(e) => ctx.complete(500, errMap(e, "CONTEXT INIT FAILED"))
}

1 通过给AkkaClusterSupervisorActor 发消息获取对应 context name的 jobmanager actor 引用, 然后发送创建job的消息,

case StartJob(appName, classPath, jobConfig, events) => {
startJobInternal(appName, classPath, jobConfig, events, jobContext, sparkEnv)
}
resultActor ! Subscribe(jobId, sender, events)
statusActor ! Subscribe(jobId, sender, events)
Future {
val job = constructor()
if (job.isInstanceOf[NamedObjectSupport]) {
val namedObjects = job.asInstanceOf[NamedObjectSupport].namedObjectsPrivate
if (namedObjects.get() == null) {
namedObjects.compareAndSet(null, jobServerNamedObjects)
}
}
try {
statusActor ! JobStatusActor.JobInit(jobInfo)
val jobC = jobContext.asInstanceOf[job.C]
job.validate(jobC, jobConfig) match {
case SparkJobInvalid(reason) => {
val err = new Throwable(reason)
statusActor ! JobValidationFailed(jobId, DateTime.now(), err)
throw err
}
case SparkJobValid => {
statusActor ! JobStarted(jobId: String, contextName, jobInfo.startTime)
val sc = jobContext.sparkContext
sc.setJobGroup(jobId, s"Job group for $jobId and spark context ${sc.applicationId}", true)
job.runJob(jobC, jobConfig)
}
}
} (executionContext).andThen {
case Success(result: Any) =>
statusActor ! JobFinished(jobId, DateTime.now())
resultActor ! JobResult(jobId, result)
}(executionContext).andThen {
case _ =>
resultActor ! Unsubscribe(jobId, subscriber)
statusActor ! Unsubscribe(jobId, subscriber)
currentRunningJobs.getAndDecrement()
postEachJob()
}(executionContext)
override def runJob(sc: SparkContext, config: Config): Any = {

2 jobmanager actor模式匹配接受到startJob 消息, 调用startJobInternal 方法, 里面向 resultActor statusActor发送注册消息, 每个jobmanager都有自己的 resultActor 和statusActor, jobid是递增的, resultActor 和 statusActor 里面持久化jobid 等信息,
验证用户基于 jobserver框架, 继承的类的正确性, 继承的类必须 实现方法runJob donnot call me , i call you,
传入 jobmanager中的sparkcontext 实例, 跑完之后,更新状态和结果

这里可以考虑下 sparkstreaming, 就是基于 spark core的一个框架, spark streaming 的job qps是亚秒级, 可以估计一下 这里的吞吐量

3 JobInfoActor 里面查询job的 处理就是获取持久化中的每个job的信息

jar包管理,

整体流程, 是上传 jar包的时候,根据 dao实现包 字节流存入文件或者数据库, 内存中放入缓存,

坑 和 实践

val daoAskTimeout = Timeout(3 seconds)
// TODO: refactor so we don't need Await, instead flatmap into more futures
val resp = Await.result(
(daoActor ? JobDAOActor.GetLastUploadTime(appName))(daoAskTimeout).mapTo[JobDAOActor.LastUploadTime],
daoAskTimeout.duration)
case GetLastUploadTime(appName) =>
sender() ! LastUploadTime(dao.getLastUploadTime(appName))
trait JobDAO {
def getLastUploadTime(appName: String): Option[DateTime] =
getApps.get(appName)
```

override def getApps: Map[String, DateTime] = {
db withSession {
implicit session =>

  // It's "select appName, max(uploadTime) from jars group by appName
  // max(uploadTime) is the latest upload time of the jar.
    val query = jars.groupBy { _.appName }.map {
      case (appName, jar) => (appName -> jar.map(_.uploadTime).max.get)
    }

    query.list.map {
      case (appName: String, timestamp: Timestamp) =>
        (appName, convertDateSqlToJoda(timestamp))
    }.toMap
}

}

```

在每次 运行一个 job的时候,其中有一步 获取jar包 appname的最后更新时间, 这个最终会传入 executor中, 用来判断拉取 drive上服务器中的jar包,
坑的是, 这里要做完才能执行下面的步骤, 是个同步阻塞操作,

里面会查询数据库, 这里的问题是当job 调用频率较高的时候, jobmanager actor的mail 中积累的消息越来越多, 导致超时, 然后jvm 崩溃, spark context stop了,

改进, 1 由 文件改为了postgres数据库
2 在内存中维护了一个 hashmap, 对应 appname和最后更新时间, 更新数据库中jar的时候, 更新内存中的的数据, 重启 jobserver 的时候,从数据库中获取最大的LastUploadTime 写入内存map, 然后 获取的时候, 直接返回 内存中的数据

实践

在 spark jobserver外面实现了一个 sparkmanager

压测, 每小时可以运行 万级别的job, 不同的业务线对应spark jobserver 中不同的 context中, 聚合日志, 数据分析, 流量校验, 都可以根据业务线量的大小, 处理的数据, 分配不同的实例数目, 实例的内存和cpu数目可以指定,

sparkmanager 可以根据目前的job数目, 进行限流, 使用队列进行削峰, 集群挂了之后, sparkmanager检测到,不再消费消息, 发邮件通知处理,

聚合日志中还涉及到 下载日志, 和上传日志, 在外面又多了一层状态控制, 对job控制在域名级别, 每个域名都可以单独 recover,

后续发展, 可以做成一个域名小时级别的, 的流式处理框架,

欢迎大家关注:sunbiaobiao's微信公众号