defcompute( time: Long, // in milliseconds numElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long// in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) {
// in seconds, should be close to batchDuration val delaySinceUpdate = (time - latestTime).toDouble / 1000
// in elements/second val processingRate = numElements.toDouble / processingDelay * 1000
// In our system `error` is the difference between the desired rate and the measured rate // based on the latest batch information. We consider the desired rate to be latest rate, // which is what this estimator calculated for the previous batch. // in elements/second val error = latestRate - processingRate
// The error integral, based on schedulingDelay as an indicator for accumulated errors. // A scheduling delay s corresponds to s * processingRate overflowing elements. Those // are elements that couldn't be processed in previous batches, leading to this delay. // In the following, we assume the processingRate didn't change too much. // From the number of overflowing elements we can calculate the rate at which they would be // processed by dividing it by the batch interval. This rate is our "historical" error, // or integral part, since if we subtracted this rate from the previous "calculated rate", // there wouldn't have been any overflowing elements, and the scheduling delay would have // been zero. // (in elements/second) val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
/** * Push a single data item into the buffer. */ defaddData(data: Any): Unit = { if (state == Active) { // 这里就会限制速率 waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { thrownewSparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { thrownewSparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }