Commits
Jannik Nørgaard Steen authored 1196f080b57
20 20 | import java.util.concurrent.ScheduledExecutorService; |
21 21 | import java.util.concurrent.TimeUnit; |
22 22 | import java.util.concurrent.atomic.AtomicLong; |
23 23 | import java.util.stream.Collectors; |
24 24 | import java.util.stream.IntStream; |
25 25 | |
26 26 | /** |
27 27 | * Optional system properties: |
28 28 | * <ul> |
29 29 | * <li>test.nas.poll.initial.delay (default 0) - denotes the initial delay in milliseconds before starting a poll</li> |
30 - | * <li>test.nas.poll.period (default 2000) - denotes the interval in milliseconds in which to poll the NAS for JobIds</li> |
30 + | * <li>test.nas.poll.period (default 500) - denotes the interval in milliseconds in which to poll the NAS for JobIds</li> |
31 31 | * <li>test.feed.initial.delay (default 0) - denotes the initial delay in milliseconds before starting to check the queue</li> |
32 32 | * <li>test.feed.period (default 500) - denotes the interval in milliseconds in which to check and perhaps feed the queue with data</li> |
33 - | * <li>test.transit.queue.max.drain (default 20)- denotes the maximum number of entities to drain from the transit queue at each attempt</li> |
33 + | * <li>test.transit.queue.max.drain (default 1000)- denotes the maximum number of entities to drain from the transit queue at each attempt</li> |
34 34 | * <li>test.stats.initial.delay (default 60) - denotes the initial delay in seconds before starting to show stats</li> |
35 35 | * <li>test.stats.period (default 60) - denotes the number of seconds between each output of stats</li> |
36 36 | * |
37 37 | * </ul> |
38 38 | * |
39 39 | * @author jns, Arosii Information Systems A/S |
40 40 | */ |
41 41 | public class Conductor { |
42 42 | |
43 43 | private final CbsClient cbsClient; |
90 90 | .rangeClosed(1, conf.numThreads()) |
91 91 | .forEach(i -> |
92 92 | cbs.scheduleAtFixedRate(new Taker(feederQueue, transitQueue), |
93 93 | new Random().nextInt((int) (conf.requestWindowSize() + 1)), |
94 94 | conf.requestWindowSize(), |
95 95 | TimeUnit.MILLISECONDS) |
96 96 | ); |
97 97 | |
98 98 | nas.scheduleAtFixedRate(new Poller(transitQueue, nasService, subscription), |
99 99 | Long.getLong("test.nas.poll.initial.delay", 0), |
100 - | Long.getLong("test.nas.poll.period", 2000), |
100 + | Long.getLong("test.nas.poll.period", 500), |
101 101 | TimeUnit.MILLISECONDS); |
102 102 | feed.scheduleAtFixedRate(new Feeder(feederQueue), |
103 103 | Long.getLong("test.feed.initial.delay",0), |
104 104 | Long.getLong("test.feed.period", 500), |
105 105 | TimeUnit.MILLISECONDS); |
106 106 | stat.scheduleAtFixedRate(new Stats(), |
107 107 | Long.getLong("test.stats.initial.delay", 5), |
108 108 | Long.getLong("test.stats.period", 5), |
109 109 | TimeUnit.SECONDS); |
110 110 | logger.info("Started " + conf.numThreads() + " CBS call threads."); |
150 150 | public Poller(LinkedTransferQueue<TestDriver.TransitElement> transitQueue, NasService nasService, Subscription subscription) { |
151 151 | this.transitQueue = transitQueue; |
152 152 | this.nasService = nasService; |
153 153 | this.subscription = subscription; |
154 154 | } |
155 155 | |
156 156 | |
157 157 | public void run() { |
158 158 | final List<String> jobIds = nasService.pullJobIds(subscription); |
159 159 | final List<TestDriver.TransitElement> inTransit = new ArrayList<>(); |
160 - | transitQueue.drainTo(inTransit, Integer.getInteger("test.transit.queue.max.drain", 100)); |
160 + | transitQueue.drainTo(inTransit, Integer.getInteger("test.transit.queue.max.drain", 1000)); |
161 161 | |
162 162 | final List<TestDriver.TransitElement> inCache = inTransit.stream() |
163 163 | .filter(transitElement -> jobIds.stream() |
164 164 | .anyMatch(jobId -> transitElement.getJobId().equals(jobId))) |
165 165 | .collect(Collectors.toList()); |
166 166 | inTransit.removeAll(inCache); |
167 167 | transitQueue.addAll(inTransit); // feed back all not yet in cache |
168 168 | } |
169 169 | } |
170 170 | |
189 189 | } |
190 190 | } |
191 191 | |
192 192 | /* output statistics */ |
193 193 | private class Stats implements Runnable { |
194 194 | |
195 195 | public void run() { |
196 196 | final long served = servedFromCache.get(); |
197 197 | final long processed = processedForCaching.get(); |
198 198 | final long total = totalProcessed.get(); |
199 - | final double hitPrc = Math.round((((processed == 0 ? 100d : (1d / ((double)served / (double)processed)) * 100d) * 100d)) / 100d); |
200 - | final double missPrc = Math.round((((served == 0 ? 100d : (1d - ((double)served / (double)processed)) * 100d) * 100d)) / 100d); |
199 + | final double hitPrc; |
200 + | final double missPrc; |
201 + | if (total == 0) { |
202 + | hitPrc = Double.NaN; |
203 + | missPrc = Double.NaN; |
204 + | } else { |
205 + | hitPrc = Math.round(((((double)served / (double)total) * 100d) * 100d) / 100d); |
206 + | missPrc = Math.round(((((double)processed / (double)total) * 100d) * 100d) / 100d); |
207 + | } |
201 208 | logger.info("Statistics [h:" + hitPrc + " m:" + missPrc + ",total-processed:" + total + ",served-from-cache:" + served + ",processed-for-caching:" + processed + ",feeder-queue:" + feederQueue.size() + ",transit-queue:" + transitQueue.size() + "]"); |
202 209 | } |
203 210 | } |
204 211 | } |