QQ:359855043资料

本文主要介绍QQ:359855043资料 方法和在新技术下所面对的“挑战”,方便大家深入理解QQ:359855043资料 过程。本文也将分享QQ:359855043资料 所遇到的问题和应对策略,怎么解决怎么做的问题。
通过深入本文可以理解代码原理,进行代码文档的下载,也可以查看相应 Demo 部署效果。

1.DisallowConcurrentExceution

从字面意思来看也就是不允许并发执行

简单的演示一下

    [DisallowConcurrentExecution]     public class TestDisallowConcurrentExectionJob : IJob     {         public async Task Execute(IJobExecutionContext context)         {             await Task.Run(() =>             {                  var nextTime = context.NextFireTimeUtc?.ToLocalTime();                  var currentTime = DateTime.Now;                  Console.WriteLine($"CurrentTime={currentTime},  FireTime={context.ScheduledFireTimeUtc?.ToLocalTime()},  NextTime={nextTime}");              });              Thread.Sleep(10000);          }     }      public class TestDisallowConcurrentExectionScheduler     {         public async static Task Test()         {             var scheduler = await StdSchedulerFactory.GetDefaultScheduler();              await scheduler.Start();              var job = JobBuilder.CreateForAsync<TestDisallowConcurrentExectionJob>()                                 .WithIdentity("TestDisallowConcurrentExectionJob")                                 .UsingJobData("myKey", "myValue")                                 .Build();                var trigger = TriggerBuilder.Create()                                         .WithSimpleSchedule(s =>                                                            s.WithIntervalInSeconds(1)                                                             .RepeatForever())                                         .Build();              await scheduler.ScheduleJob(job, trigger);         }     }

未添加特性的结果

QQ:359855043

 

 

 添加特性的结果

QQ:359855043

 

 

 Quartz默认是十个线程工作线程加一个调度线程,当线程在等待时,其他工作线程也会进来,而加上DiallowConcurrentExection特性时则都会处于等待状态

原理图

QQ:359855043

 

 

 源码解析

通过QuartzSchedulerThread中的Run方法调用AcquireNextTriggers获取下一次的触发器

        public virtual Task<IReadOnlyCollection<IOperableTrigger>> AcquireNextTriggers(             DateTimeOffset noLaterThan,             int maxCount,             TimeSpan timeWindow,             CancellationToken cancellationToken = default)         {             lock (lockObject)             {                 var result = new List<IOperableTrigger>();                  // return empty list if store has no triggers.                 if (timeTriggers.Count == 0)                 {                     return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result);                 }                  var acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();                 var excludedTriggers = new HashSet<TriggerWrapper>();                 DateTimeOffset batchEnd = noLaterThan;                  while (true)                 {                     var tw = timeTriggers.FirstOrDefault();                     if (tw == null)                     {                         break;                     }                     if (!timeTriggers.Remove(tw))                     {                         break;                     }                      if (tw.Trigger.GetNextFireTimeUtc() == null)                     {                         continue;                     }                      if (ApplyMisfire(tw))                     {                         if (tw.Trigger.GetNextFireTimeUtc() != null)                         {                             timeTriggers.Add(tw);                         }                         continue;                     }                      if (tw.Trigger.GetNextFireTimeUtc() > batchEnd)                     {                         timeTriggers.Add(tw);                         break;                     }                      // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then                     // put it back into the timeTriggers set and continue to search for next trigger.                     JobKey jobKey = tw.Trigger.JobKey;                     IJobDetail job = jobsByKey[jobKey].JobDetail;                     if (job.ConcurrentExecutionDisallowed)                     {                         if (!acquiredJobKeysForNoConcurrentExec.Add(jobKey))                         {                             excludedTriggers.Add(tw);                             continue; // go to next trigger in store.                         }                     }                      tw.state = InternalTriggerState.Acquired;                     tw.Trigger.FireInstanceId = GetFiredTriggerRecordId();                     IOperableTrigger trig = (IOperableTrigger) tw.Trigger.Clone();                      if (result.Count == 0)                     {                         var now = SystemTime.UtcNow();                         var nextFireTime = tw.Trigger.GetNextFireTimeUtc().GetValueOrDefault(DateTimeOffset.MinValue);                         var max = now > nextFireTime ? now : nextFireTime;                          batchEnd = max + timeWindow;                     }                      result.Add(trig);                      if (result.Count == maxCount)                     {                         break;                     }                 }                  // If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.                 if (excludedTriggers.Count > 0)                 {                     foreach (var excludedTrigger in excludedTriggers)                     {                         timeTriggers.Add(excludedTrigger);                     }                 }                 return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result);             }         }

RAMJobStore中的TriggersFired方法

当添加了DisallowConcurrentExecution特性后

先从TimeTriggers中移除Trigger

再把Job添加BlockedJobs中

                    if (job.ConcurrentExecutionDisallowed)                     {                         IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(job.Key);                         foreach (TriggerWrapper ttw in trigs)                         {                             if (ttw.state == InternalTriggerState.Waiting)                             {                                 ttw.state = InternalTriggerState.Blocked;                             }                             if (ttw.state == InternalTriggerState.Paused)                             {                                 ttw.state = InternalTriggerState.PausedAndBlocked;                             }                             timeTriggers.Remove(ttw);                         }                         blockedJobs.Add(job.Key);                     }

RAMJobStore中的TriggeredJobComplete方法

当Job执行完后

如果添加了DisallowConcurrentExecution特性

 

先移除BlockedJobs中Job 再把Trigger添加到TimeTriggers中

  if (jd.ConcurrentExecutionDisallowed)                     {                         blockedJobs.Remove(jd.Key);                         IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key);                         foreach (TriggerWrapper ttw in trigs)                         {                             if (ttw.state == InternalTriggerState.Blocked)                             {                                 ttw.state = InternalTriggerState.Waiting;                                 timeTriggers.Add(ttw);                             }                             if (ttw.state == InternalTriggerState.PausedAndBlocked)                             {                                 ttw.state = InternalTriggerState.Paused;                             }                         }                          signaler.SignalSchedulingChange(null, cancellationToken);                     }

2.PersistJobDataAfterExecution

从字面意思来看也就是执行后保留作业数据

简单演示一下

 [PersistJobDataAfterExecution]     public class TestPersistJobDataAfterExecutionJob : IJob     {         public async Task Execute(IJobExecutionContext context)         {             await Task.Run(() =>             {                  var myVaule = context.JobDetail.JobDataMap["myKey"];                  Console.WriteLine(myVaule);                  context.JobDetail.JobDataMap["myKey"] = myVaule + new Random().Next(1,10).ToString();             });         }     }      public class TestPersistJobDataAfterExcutionScheduler     {         public async static Task Test()         {             var scheduler = await StdSchedulerFactory.GetDefaultScheduler();              await scheduler.Start();              var job = JobBuilder.CreateForAsync<TestPersistJobDataAfterExecutionJob>()                                 .WithIdentity("TestPersistJobDataAfterExcutionJob")                                 .UsingJobData("myKey", "myValue")                                 .Build();                var trigger = TriggerBuilder.Create()                                        .WithSimpleSchedule(s =>                                                            s.WithIntervalInSeconds(1)                                                             .RepeatForever())                                        .Build();              await scheduler.ScheduleJob(job, trigger);         }     }

未加特性的结果

QQ:359855043

 

 

 加特性的结果

QQ:359855043

 

 

 原理图

 

 

 

 QQ:359855043

 

 

 

 源码解析

QuartzSchedulerThread中的Run方法

                                JobRunShell shell;                                 try                                 {                                     shell = qsRsrcs.JobRunShellFactory.CreateJobRunShell(bndle);                                     await shell.Initialize(qs, CancellationToken.None).ConfigureAwait(false);                                 }                                 catch (SchedulerException)                                 {                                     await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false);                                     continue;                                 }                                  var threadPoolRunResult = qsRsrcs.ThreadPool.RunInThread(() => shell.Run(CancellationToken.None));                                 if (threadPoolRunResult == false)                                 {                                     // this case should never happen, as it is indicative of the                                     // scheduler being shutdown or a bug in the thread pool or                                     // a thread pool being used concurrently - which the docs                                     // say not to do...                                     Log.Error("ThreadPool.RunInThread() returned false!");                                     await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false);                                 }

JobRunShell初始化方法

 public virtual async Task Initialize(             QuartzScheduler sched,             CancellationToken cancellationToken = default)         {             qs = sched;              IJob job;             IJobDetail jobDetail = firedTriggerBundle.JobDetail;              try             {                 job = sched.JobFactory.NewJob(firedTriggerBundle, scheduler);             }             catch (SchedulerException se)             {                 await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);                 throw;             }             catch (Exception e)             {                 SchedulerException se = new SchedulerException($"Problem instantiating type '{jobDetail.JobType.FullName}'", e);                 await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= '{jobDetail.Key}'", se, cancellationToken).ConfigureAwait(false);                 throw se;             }              jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);         }

SimpleJobFactory中的NewJob函数可以看出Job是无状态的直接通过反射创建的

    public virtual IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)         {             IJobDetail jobDetail = bundle.JobDetail;             Type jobType = jobDetail.JobType;             try             {                 if (log.IsDebugEnabled())                 {                     log.Debug($"Producing instance of Job '{jobDetail.Key}', class={jobType.FullName}");                 }                  return ObjectUtils.InstantiateType<IJob>(jobType);             }             catch (Exception e)             {                 SchedulerException se = new SchedulerException($"Problem instantiating class '{jobDetail.JobType.FullName}'", e);                 throw se;             }         }

JobRunShell中Run方法将JobExecutionContextImpl塞给了Execute方法

                   private JobExecutionContextImpl jec;                                      // Execute the job                     try                     {                         if (log.IsDebugEnabled())                         {                             log.Debug("Calling Execute on job " + jobDetail.Key);                         }                          await job.Execute(jec).ConfigureAwait(false);                          endTime = SystemTime.UtcNow();                     }

JobRunShell中Run方法调用的NotifyJobStoreJobComplete函数

await qs.NotifyJobStoreJobComplete(trigger, jobDetail, instCode, cancellationToken).ConfigureAwait(false);

JobRunShell中的NotifyJobStoreJobComplete 可以看出调用了JobStore.TriggeredJobComplete

 public virtual Task NotifyJobStoreJobComplete(             IOperableTrigger trigger,             IJobDetail detail,             SchedulerInstruction instCode,             CancellationToken cancellationToken = default)         {             return resources.JobStore.TriggeredJobComplete(trigger, detail, instCode, cancellationToken);         }

Quartz.NET中StdScheduleFactory如果未指定JobStore则默认RAMJobStore

从而可以看出RAMJobStore中通过TriggerJobComplete方法来检查是否有PersistJobDataAfterExecution特性

如果有通过MemberwiseClone函数克隆出数据来再通过JobBuilder来构建一个JobDetail

                  if (jobDetail.PersistJobDataAfterExecution)                     {                         JobDataMap newData = jobDetail.JobDataMap;                         if (newData != null)                         {                             newData = (JobDataMap) newData.Clone();                             newData.ClearDirtyFlag();                         }                         jd = jd.GetJobBuilder().SetJobData(newData).Build();                         jw.JobDetail = jd;                     }                     if (jd.ConcurrentExecutionDisallowed)                     {                         blockedJobs.Remove(jd.Key);                         IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key);                         foreach (TriggerWrapper ttw in trigs)                         {                             if (ttw.state == InternalTriggerState.Blocked)                             {                                 ttw.state = InternalTriggerState.Waiting;                                 timeTriggers.Add(ttw);                             }                             if (ttw.state == InternalTriggerState.PausedAndBlocked)                             {                                 ttw.state = InternalTriggerState.Paused;                             }                         }                          signaler.SignalSchedulingChange(null, cancellationToken);                     }

最终会通过JobRunShell中的Run方法中的ReturnJob方法 返回Job

                qs.RemoveInternalSchedulerListener(this);                 if (jec != null)                 {                     if (jec.JobInstance != null)                     {                         qs.JobFactory.ReturnJob(jec.JobInstance);                     }                      jec.Dispose();                 }

 

        public virtual void ReturnJob(IJob job)         {             var disposable = job as IDisposable;             disposable?.Dispose();         }

 

QQ:359855043资料部分资料来自网络,侵权毕设源码联系删除

区块链毕设网(www.qklbishe.com)全网最靠谱的原创区块链毕设代做网站
部分资料来自网络,侵权联系删除!
资源收费仅为搬运整理打赏费用,用户自愿支付 !
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » QQ:359855043资料

提供最优质的资源集合

立即查看 了解详情