下面列出了org.joda.time.Instant#equals ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {
if (record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) {
currentWatermark = new Instant(record.getTimestamp());
} else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// This is the first record and it does not have LOG_APPEND_TIME.
// Most likely the topic is not configured correctly.
throw new IllegalStateException(
String.format(
"LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type "
+ "is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. "
+ "Actual timestamp type is '%s'.",
record.getTopic(), record.getTimestampType()));
}
return currentWatermark;
}
@Override
public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) {
if (record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) {
currentWatermark = new Instant(record.getTimestamp());
} else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// This is the first record and it does not have LOG_APPEND_TIME.
// Most likely the topic is not configured correctly.
throw new IllegalStateException(
String.format(
"LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type "
+ "is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. "
+ "Actual timestamp type is '%s'.",
record.getTopic(), record.getTimestampType()));
}
return currentWatermark;
}
private boolean flushTimers(long watermark) {
if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
try {
Instant watermarkInstant = new Instant(watermark);
timerInternals.advanceInputWatermark(watermarkInstant);
if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timerInternals.advanceProcessingTime(watermarkInstant);
timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);
}
fireEligibleTimers(timerInternals);
} catch (Exception e) {
throw new RuntimeException("Failed advancing processing time", e);
}
}
return outputManager.tryFlush();
}
private Visit findGoal(Instant timestamp, Iterable<Visit> goals) {
for (Visit goal : goals) {
if (timestamp.equals(goal.timestamp())) {
return goal;
}
}
return null;
}
public static String formatTime(Instant timestamp) {
if (timestamp == null) {
return "null";
} else if (timestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
return "TIMESTAMP_MIN_VALUE";
} else if (timestamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
return "TIMESTAMP_MAX_VALUE";
} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) {
return "END_OF_GLOBAL_WINDOW";
} else {
return timestamp.toString(TIME_FMT);
}
}
/**
* Calculate averages for any windows which can now be retired. Also prune entries which can no
* longer contribute to any future window.
*/
private void prune(Instant newWindowStart) {
while (!newWindowStart.equals(windowStart)) {
averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
winningPricesByCategory.removeIf(
categoryPriceTimestampedValue ->
categoryPriceTimestampedValue.getTimestamp().isBefore(windowStart));
if (winningPricesByCategory.isEmpty()) {
windowStart = newWindowStart;
}
}
}
/** Retire active windows until we've reached {@code newWindowStart}. */
private void retireWindows(Instant newWindowStart) {
while (!newWindowStart.equals(windowStart)) {
NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart);
// Count bids in the window (windowStart, windowStart + size].
countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
// Advance the window.
windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
// Retire bids which will never contribute to a future window.
if (!retireBids(windowStart)) {
// Can fast forward to latest window since no more outstanding bids.
windowStart = newWindowStart;
}
}
}
@Override
protected void run() {
TimestampedValue<Event> timestampedEvent = nextInput();
if (timestampedEvent == null) {
// Capture all remaining bids in results.
retireWindow(lastTimestamp);
allDone();
return;
}
Event event = timestampedEvent.getValue();
if (event.bid == null) {
// Ignore non-bid events.
return;
}
lastTimestamp = timestampedEvent.getTimestamp();
Instant newWindowStart =
windowStart(
Duration.standardSeconds(configuration.windowSizeSec),
Duration.standardSeconds(configuration.windowSizeSec),
lastTimestamp);
if (!newWindowStart.equals(windowStart)) {
// Capture highest priced bids in current window and retire it.
retireWindow(lastTimestamp);
windowStart = newWindowStart;
}
// Keep only the highest bids.
captureBid(event.bid);
}
/**
* Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating
* whether the timestamp is the end of the global window or one of the distinguished values {@link
* #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
*/
public static String formatTimestamp(Instant timestamp) {
if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) {
return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)";
} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) {
return timestamp.toString() + " (end of global window)";
} else {
return timestamp.toString();
}
}
@Override
public boolean isForWindow(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
gcTime = gcTime.plus(GC_DELAY_MS);
return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
if (timeDomain == TimeDomain.EVENT_TIME && timestamp.equals(window.maxTimestamp())) {
// Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
// state transitions.
return;
}
timers.deleteTimer(timestamp, timeDomain);
}
@Override
public boolean isForWindow(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy).plus(1);
return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
}
@Override
public void add(Instant value) {
final Instant currentValue = readInternal();
final Instant combinedValue =
currentValue == null ? value : timestampCombiner.combine(currentValue, value);
if (!combinedValue.equals(currentValue)) {
writeInternal(combinedValue);
}
}
@Override
public void run() {
TimestampedValue<Event> timestampedEvent = nextInput();
if (timestampedEvent == null) {
allDone();
return;
}
Event event = timestampedEvent.getValue();
if (event.bid != null) {
// Ignore bid events.
// Keep looking for next events.
return;
}
Instant timestamp = timestampedEvent.getTimestamp();
Instant newWindowStart =
windowStart(
Duration.standardSeconds(configuration.windowSizeSec),
Duration.standardSeconds(configuration.windowSizeSec),
timestamp);
if (!newWindowStart.equals(windowStart)) {
// Retire this window.
retirePersons();
retireAuctions();
windowStart = newWindowStart;
}
if (event.newAuction != null) {
// Join new auction with existing person, if any.
Person person = newPersons.get(event.newAuction.seller);
if (person != null) {
addResult(event.newAuction, person, timestamp);
} else {
// Remember auction for future new people.
newAuctions.put(event.newAuction.seller, event.newAuction);
}
} else { // event is not an auction, nor a bid, so it is a person
// Join new person with existing auctions.
for (Auction auction : newAuctions.get(event.newPerson.id)) {
addResult(auction, event.newPerson, timestamp);
}
// We'll never need these auctions again.
newAuctions.removeAll(event.newPerson.id);
// Remember person for future auctions.
newPersons.put(event.newPerson.id, event.newPerson);
}
}
/**
* Returns total size of all records that remain in Kinesis stream. The size is estimated taking
* into account size of the records that were added to the stream after timestamp of the most
* recent record returned by the reader. If no records have yet been retrieved from the reader
* {@link UnboundedSource.UnboundedReader#BACKLOG_UNKNOWN} is returned. When currently processed
* record is not further behind than {@link #upToDateThreshold} then this method returns 0.
*
* <p>The method can over-estimate size of the records for the split as it reports the backlog
* across all shards. This can lead to unnecessary decisions to scale up the number of workers but
* will never fail to scale up when this is necessary due to backlog size.
*
* @see <a href="https://issues.apache.org/jira/browse/BEAM-9439">BEAM-9439</a>
*/
@Override
public long getSplitBacklogBytes() {
Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp();
if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
LOG.debug("Split backlog bytes for stream {} unknown", source.getStreamName());
return UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
}
if (latestRecordTimestamp.plus(upToDateThreshold).isAfterNow()) {
LOG.debug(
"Split backlog bytes for stream {} with latest record timestamp {}: 0 (latest record timestamp is up-to-date with threshold of {})",
source.getStreamName(),
latestRecordTimestamp,
upToDateThreshold);
return 0L;
}
if (backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) {
LOG.debug(
"Split backlog bytes for {} stream with latest record timestamp {}: {} (cached value)",
source.getStreamName(),
latestRecordTimestamp,
lastBacklogBytes);
return lastBacklogBytes;
}
try {
lastBacklogBytes = kinesis.getBacklogBytes(source.getStreamName(), latestRecordTimestamp);
backlogBytesLastCheckTime = Instant.now();
} catch (TransientKinesisException e) {
LOG.warn(
"Transient exception occurred during backlog estimation for stream {}.",
source.getStreamName(),
e);
}
LOG.info(
"Split backlog bytes for {} stream with {} latest record timestamp: {}",
source.getStreamName(),
latestRecordTimestamp,
lastBacklogBytes);
return lastBacklogBytes;
}
/**
* Factory method returns instances of the GJ cutover chronology. Any
* cutover date may be specified.
*
* @param zone the time zone to use, null is default
* @param gregorianCutover the cutover to use, null means default
* @param minDaysInFirstWeek minimum number of days in first week of the year; default is 4
*/
public static synchronized GJChronology getInstance(
DateTimeZone zone,
ReadableInstant gregorianCutover,
int minDaysInFirstWeek) {
zone = DateTimeUtils.getZone(zone);
Instant cutoverInstant;
if (gregorianCutover == null) {
cutoverInstant = DEFAULT_CUTOVER;
} else {
cutoverInstant = gregorianCutover.toInstant();
}
GJChronology chrono;
ArrayList<GJChronology> chronos = cCache.get(zone);
if (chronos == null) {
chronos = new ArrayList<GJChronology>(2);
cCache.put(zone, chronos);
} else {
for (int i=chronos.size(); --i>=0; ) {
chrono = chronos.get(i);
if (minDaysInFirstWeek == chrono.getMinimumDaysInFirstWeek() &&
cutoverInstant.equals(chrono.getGregorianCutover())) {
return chrono;
}
}
}
if (zone == DateTimeZone.UTC) {
chrono = new GJChronology
(JulianChronology.getInstance(zone, minDaysInFirstWeek),
GregorianChronology.getInstance(zone, minDaysInFirstWeek),
cutoverInstant);
} else {
chrono = getInstance(DateTimeZone.UTC, cutoverInstant, minDaysInFirstWeek);
chrono = new GJChronology
(ZonedChronology.getInstance(chrono, zone),
chrono.iJulianChronology,
chrono.iGregorianChronology,
chrono.iCutoverInstant);
}
chronos.add(chrono);
return chrono;
}
/**
* Factory method returns instances of the GJ cutover chronology. Any
* cutover date may be specified.
*
* @param zone the time zone to use, null is default
* @param gregorianCutover the cutover to use, null means default
* @param minDaysInFirstWeek minimum number of days in first week of the year; default is 4
*/
public static synchronized GJChronology getInstance(
DateTimeZone zone,
ReadableInstant gregorianCutover,
int minDaysInFirstWeek) {
zone = DateTimeUtils.getZone(zone);
Instant cutoverInstant;
if (gregorianCutover == null) {
cutoverInstant = DEFAULT_CUTOVER;
} else {
cutoverInstant = gregorianCutover.toInstant();
}
GJChronology chrono;
ArrayList<GJChronology> chronos = cCache.get(zone);
if (chronos == null) {
chronos = new ArrayList<GJChronology>(2);
cCache.put(zone, chronos);
} else {
for (int i=chronos.size(); --i>=0; ) {
chrono = chronos.get(i);
if (minDaysInFirstWeek == chrono.getMinimumDaysInFirstWeek() &&
cutoverInstant.equals(chrono.getGregorianCutover())) {
return chrono;
}
}
}
if (zone == DateTimeZone.UTC) {
chrono = new GJChronology
(JulianChronology.getInstance(zone, minDaysInFirstWeek),
GregorianChronology.getInstance(zone, minDaysInFirstWeek),
cutoverInstant);
} else {
chrono = getInstance(DateTimeZone.UTC, cutoverInstant, minDaysInFirstWeek);
chrono = new GJChronology
(ZonedChronology.getInstance(chrono, zone),
chrono.iJulianChronology,
chrono.iGregorianChronology,
chrono.iCutoverInstant);
}
chronos.add(chrono);
return chrono;
}
/**
* Factory method returns instances of the GJ cutover chronology. Any
* cutover date may be specified.
*
* @param zone the time zone to use, null is default
* @param gregorianCutover the cutover to use, null means default
* @param minDaysInFirstWeek minimum number of days in first week of the year; default is 4
*/
public static synchronized GJChronology getInstance(
DateTimeZone zone,
ReadableInstant gregorianCutover,
int minDaysInFirstWeek) {
zone = DateTimeUtils.getZone(zone);
Instant cutoverInstant;
if (gregorianCutover == null) {
cutoverInstant = DEFAULT_CUTOVER;
} else {
cutoverInstant = gregorianCutover.toInstant();
}
GJChronology chrono;
ArrayList<GJChronology> chronos = cCache.get(zone);
if (chronos == null) {
chronos = new ArrayList<GJChronology>(2);
cCache.put(zone, chronos);
} else {
for (int i=chronos.size(); --i>=0; ) {
chrono = chronos.get(i);
if (minDaysInFirstWeek == chrono.getMinimumDaysInFirstWeek() &&
cutoverInstant.equals(chrono.getGregorianCutover())) {
return chrono;
}
}
}
if (zone == DateTimeZone.UTC) {
chrono = new GJChronology
(JulianChronology.getInstance(zone, minDaysInFirstWeek),
GregorianChronology.getInstance(zone, minDaysInFirstWeek),
cutoverInstant);
} else {
chrono = getInstance(DateTimeZone.UTC, cutoverInstant, minDaysInFirstWeek);
chrono = new GJChronology
(ZonedChronology.getInstance(chrono, zone),
chrono.iJulianChronology,
chrono.iGregorianChronology,
chrono.iCutoverInstant);
}
chronos.add(chrono);
return chrono;
}
/**
* Factory method returns instances of the GJ cutover chronology. Any
* cutover date may be specified.
*
* @param zone the time zone to use, null is default
* @param gregorianCutover the cutover to use, null means default
* @param minDaysInFirstWeek minimum number of days in first week of the year; default is 4
*/
public static synchronized GJChronology getInstance(
DateTimeZone zone,
ReadableInstant gregorianCutover,
int minDaysInFirstWeek) {
zone = DateTimeUtils.getZone(zone);
Instant cutoverInstant;
if (gregorianCutover == null) {
cutoverInstant = DEFAULT_CUTOVER;
} else {
cutoverInstant = gregorianCutover.toInstant();
}
GJChronology chrono;
synchronized (cCache) {
ArrayList<GJChronology> chronos = cCache.get(zone);
if (chronos == null) {
chronos = new ArrayList<GJChronology>(2);
cCache.put(zone, chronos);
} else {
for (int i = chronos.size(); --i >= 0;) {
chrono = chronos.get(i);
if (minDaysInFirstWeek == chrono.getMinimumDaysInFirstWeek() &&
cutoverInstant.equals(chrono.getGregorianCutover())) {
return chrono;
}
}
}
if (zone == DateTimeZone.UTC) {
chrono = new GJChronology
(JulianChronology.getInstance(zone, minDaysInFirstWeek),
GregorianChronology.getInstance(zone, minDaysInFirstWeek),
cutoverInstant);
} else {
chrono = getInstance(DateTimeZone.UTC, cutoverInstant, minDaysInFirstWeek);
chrono = new GJChronology
(ZonedChronology.getInstance(chrono, zone),
chrono.iJulianChronology,
chrono.iGregorianChronology,
chrono.iCutoverInstant);
}
chronos.add(chrono);
}
return chrono;
}
/**
* Factory method returns instances of the GJ cutover chronology. Any
* cutover date may be specified.
*
* @param zone the time zone to use, null is default
* @param gregorianCutover the cutover to use, null means default
* @param minDaysInFirstWeek minimum number of days in first week of the year; default is 4
*/
public static synchronized GJChronology getInstance(
DateTimeZone zone,
ReadableInstant gregorianCutover,
int minDaysInFirstWeek) {
zone = DateTimeUtils.getZone(zone);
Instant cutoverInstant;
if (gregorianCutover == null) {
cutoverInstant = DEFAULT_CUTOVER;
} else {
cutoverInstant = gregorianCutover.toInstant();
LocalDate cutoverDate = new LocalDate(cutoverInstant.getMillis(), GregorianChronology.getInstance(zone));
if (cutoverDate.getYear() <= 0) {
throw new IllegalArgumentException("Cutover too early. Must be on or after 0001-01-01.");
}
}
GJChronology chrono;
synchronized (cCache) {
ArrayList<GJChronology> chronos = cCache.get(zone);
if (chronos == null) {
chronos = new ArrayList<GJChronology>(2);
cCache.put(zone, chronos);
} else {
for (int i = chronos.size(); --i >= 0;) {
chrono = chronos.get(i);
if (minDaysInFirstWeek == chrono.getMinimumDaysInFirstWeek() &&
cutoverInstant.equals(chrono.getGregorianCutover())) {
return chrono;
}
}
}
if (zone == DateTimeZone.UTC) {
chrono = new GJChronology
(JulianChronology.getInstance(zone, minDaysInFirstWeek),
GregorianChronology.getInstance(zone, minDaysInFirstWeek),
cutoverInstant);
} else {
chrono = getInstance(DateTimeZone.UTC, cutoverInstant, minDaysInFirstWeek);
chrono = new GJChronology
(ZonedChronology.getInstance(chrono, zone),
chrono.iJulianChronology,
chrono.iGregorianChronology,
chrono.iCutoverInstant);
}
chronos.add(chrono);
}
return chrono;
}