From e6f482a9d31a0a9f258e5acc5354eea9b228299b Mon Sep 17 00:00:00 2001 From: phasenraum2010 Date: Sun, 6 Aug 2017 18:44:19 +0200 Subject: [PATCH] working on #229, working on #228 --- .../woehlke/twitterwall/ScheduledTasks.java | 12 ++++ .../conf/properties/SchedulerProperties.java | 11 +++ .../oodm/entities/parts/TaskType.java | 3 +- .../scheduled/mq/endpoint/AsyncStartTask.java | 2 + .../endpoint/FindTweetsToRemoveSplitter.java | 12 ++++ .../mq/endpoint/impl/AsyncStartTaskImpl.java | 6 ++ .../impl/FindTweetsToRemoveSplitterImpl.java | 67 +++++++++++++++++++ .../mq/endpoint/impl/StartTaskImpl.java | 6 ++ src/main/resources/application.yml | 1 + src/main/resources/integration.xml | 45 +++++++++++++ .../resources/templates/tickersymbol/all.html | 2 +- .../twitterwall/ScheduledTasksTest.java | 5 ++ .../mq/endpoint/AbstractMqEndpointTest.java | 32 ++++++++- .../mq/endpoint/AsyncStartTaskTest.java | 2 + .../mq/endpoint/AsyncStartTaskTestImpl.java | 20 ++++++ .../mq/endpoint/StartTaskTestImpl.java | 18 +++++ 16 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/FindTweetsToRemoveSplitter.java create mode 100644 src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/FindTweetsToRemoveSplitterImpl.java diff --git a/src/main/java/org/woehlke/twitterwall/ScheduledTasks.java b/src/main/java/org/woehlke/twitterwall/ScheduledTasks.java index df556fa8..341cbfa4 100644 --- a/src/main/java/org/woehlke/twitterwall/ScheduledTasks.java +++ b/src/main/java/org/woehlke/twitterwall/ScheduledTasks.java @@ -60,6 +60,16 @@ public void fetchUsersFromDefinedUserList(){ } } + + @Scheduled(fixedRate = FIXED_RATE_FOR_SCHEDULAR_REMOVE_OLD_DATA_FROM_STORAGE) + public void removeOldDataFromStorage(){ + String msg = "remove Old Data From Storage: "; + if(schedulerProperties.getRemoveOldDataFromStorageAllow() && !schedulerProperties.getSkipFortesting()) { + Task task = asyncStartTask.removeOldDataFromStorage(); + log.info(msg+ "SCHEDULED: task "+task.getUniqueId()); + } + } + @Autowired public ScheduledTasks(SchedulerProperties schedulerProperties, AsyncStartTask mqAsyncStartTask) { this.schedulerProperties = schedulerProperties; @@ -82,6 +92,8 @@ public ScheduledTasks(SchedulerProperties schedulerProperties, AsyncStartTask mq private final static long FIXED_RATE_FOR_SCHEDULAR_FETCH_USER_LIST = ZWOELF_STUNDEN; + private final static long FIXED_RATE_FOR_SCHEDULAR_REMOVE_OLD_DATA_FROM_STORAGE = EINE_STUNDE; + private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private final SchedulerProperties schedulerProperties; diff --git a/src/main/java/org/woehlke/twitterwall/conf/properties/SchedulerProperties.java b/src/main/java/org/woehlke/twitterwall/conf/properties/SchedulerProperties.java index 49ed6c79..9dc616c4 100644 --- a/src/main/java/org/woehlke/twitterwall/conf/properties/SchedulerProperties.java +++ b/src/main/java/org/woehlke/twitterwall/conf/properties/SchedulerProperties.java @@ -36,6 +36,9 @@ public class SchedulerProperties { @NotNull private String fetchUserListName; + @NotNull + private Boolean removeOldDataFromStorageAllow; + public Boolean getAllowFetchTweetsFromTwitterSearch() { return allowFetchTweetsFromTwitterSearch; } @@ -99,4 +102,12 @@ public String getFetchUserListName() { public void setFetchUserListName(String fetchUserListName) { this.fetchUserListName = fetchUserListName; } + + public Boolean getRemoveOldDataFromStorageAllow() { + return removeOldDataFromStorageAllow; + } + + public void setRemoveOldDataFromStorageAllow(Boolean removeOldDataFromStorageAllow) { + this.removeOldDataFromStorageAllow = removeOldDataFromStorageAllow; + } } diff --git a/src/main/java/org/woehlke/twitterwall/oodm/entities/parts/TaskType.java b/src/main/java/org/woehlke/twitterwall/oodm/entities/parts/TaskType.java index 3488d895..63b7749a 100644 --- a/src/main/java/org/woehlke/twitterwall/oodm/entities/parts/TaskType.java +++ b/src/main/java/org/woehlke/twitterwall/oodm/entities/parts/TaskType.java @@ -14,5 +14,6 @@ public enum TaskType { CONTROLLER_CREATE_IMPRINT_USER, CONTROLLER_CREATE_TESTDATA_TWEETS, CONTROLLER_CREATE_TESTDATA_USERS, - CONTROLLER_ADD_USER_FOR_SCREEN_NAME + CONTROLLER_ADD_USER_FOR_SCREEN_NAME, + REMOVE_OLD_DATA_FROM_STORAGE } diff --git a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTask.java b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTask.java index 95a56290..adade3d4 100644 --- a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTask.java +++ b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTask.java @@ -17,4 +17,6 @@ public interface AsyncStartTask { Task createTestDataForTweets(); Task createTestDataForUser(); + + Task removeOldDataFromStorage(); } diff --git a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/FindTweetsToRemoveSplitter.java b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/FindTweetsToRemoveSplitter.java new file mode 100644 index 00000000..b240f900 --- /dev/null +++ b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/FindTweetsToRemoveSplitter.java @@ -0,0 +1,12 @@ +package org.woehlke.twitterwall.scheduled.mq.endpoint; + +import org.springframework.messaging.Message; +import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage; +import org.woehlke.twitterwall.scheduled.mq.msg.TweetMessage; + +import java.util.List; + +public interface FindTweetsToRemoveSplitter { + + List> splitMessage(Message message); +} diff --git a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/AsyncStartTaskImpl.java b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/AsyncStartTaskImpl.java index 30a2f0d3..2d58d7bd 100644 --- a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/AsyncStartTaskImpl.java +++ b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/AsyncStartTaskImpl.java @@ -62,6 +62,12 @@ public Task createTestDataForUser() { return send(taskType); } + @Override + public Task removeOldDataFromStorage() { + TaskType taskType = TaskType.REMOVE_OLD_DATA_FROM_STORAGE; + return send(taskType); + } + private Task send(TaskType taskType){ SendType sendType = SendType.FIRE_AND_FORGET; String msg = "START Task "+taskType+" via MQ by "+sendType; diff --git a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/FindTweetsToRemoveSplitterImpl.java b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/FindTweetsToRemoveSplitterImpl.java new file mode 100644 index 00000000..10663d97 --- /dev/null +++ b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/FindTweetsToRemoveSplitterImpl.java @@ -0,0 +1,67 @@ +package org.woehlke.twitterwall.scheduled.mq.endpoint.impl; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; +import org.woehlke.twitterwall.oodm.entities.Task; +import org.woehlke.twitterwall.oodm.entities.Tweet; +import org.woehlke.twitterwall.oodm.entities.parts.CountedEntities; +import org.woehlke.twitterwall.oodm.service.CountedEntitiesService; +import org.woehlke.twitterwall.oodm.service.TaskService; +import org.woehlke.twitterwall.oodm.service.TweetService; +import org.woehlke.twitterwall.scheduled.mq.endpoint.FindTweetsToRemoveSplitter; +import org.woehlke.twitterwall.scheduled.mq.msg.TaskMessage; +import org.woehlke.twitterwall.scheduled.mq.msg.TweetMessage; + +import java.util.ArrayList; +import java.util.List; + +@Component("mqFindTweetsToRemoveSplitter") +public class FindTweetsToRemoveSplitterImpl implements FindTweetsToRemoveSplitter { + + private final TweetService tweetService; + + private final TaskService taskService; + + private final CountedEntitiesService countedEntitiesService; + + @Autowired + public FindTweetsToRemoveSplitterImpl(TweetService tweetService, TaskService taskService, CountedEntitiesService countedEntitiesService) { + this.tweetService = tweetService; + this.taskService = taskService; + this.countedEntitiesService = countedEntitiesService; + } + + @Override + public List> splitMessage(Message message) { + CountedEntities countedEntities = countedEntitiesService.countAll(); + List> tweets = new ArrayList<>(); + TaskMessage msgIn = message.getPayload(); + long id = msgIn.getTaskId(); + Task task = taskService.findById(id); + task = taskService.start(task,countedEntities); + int pageTweet = 1; + int pageSize = 20; + Pageable pageRequestTweet = new PageRequest(pageTweet, pageSize); + //TODO: #229 https://github.com/phasenraum2010/twitterwall2/issues/229 + Page tweetList = tweetService.getAll(pageRequestTweet); + int loopId = 0; + int loopAll = tweetList.getContent().size(); + for (Tweet tweet: tweetList) { + loopId++; + TweetMessage tweetMsg = new TweetMessage(msgIn,tweet); + Message mqMessageOut = + MessageBuilder.withPayload(tweetMsg) + .copyHeaders(message.getHeaders()) + .setHeader("tw_lfd_nr",loopId) + .setHeader("tw_all",loopAll) + .build(); + tweets.add(mqMessageOut); + } + return tweets; + } +} diff --git a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/StartTaskImpl.java b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/StartTaskImpl.java index 3a77654c..f08060d5 100644 --- a/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/StartTaskImpl.java +++ b/src/main/java/org/woehlke/twitterwall/scheduled/mq/endpoint/impl/StartTaskImpl.java @@ -62,6 +62,12 @@ public Task createTestDataForUser() { return sendAndReceiveUser(taskType); } + @Override + public Task removeOldDataFromStorage() { + TaskType taskType = TaskType.REMOVE_OLD_DATA_FROM_STORAGE; + return sendAndReceiveTweet(taskType); + } + @Override public User createImprintUser() { TaskType taskType = TaskType.CONTROLLER_CREATE_IMPRINT_USER; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ad86a9c6..9262f8f9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -124,6 +124,7 @@ twitterwall: fetchUserListAllow: ${TWITTERWALL_SCHEDULER_USER_LIST_ALLOW} fetchUserListName: ${TWITTERWALL_SCHEDULER_USER_LIST_NAME} herokuDbRowsLimit: ${TWITTERWALL_SCHEDULER_HEROKU_DB_LIMIT} + removeOldDataFromStorageAllow: ${TWITTERWALL_SCHEDULER_ALLOW_REMOVE_OLD_DATA_FROM_STORAGE} skipFortesting: false testdata: oodm: diff --git a/src/main/resources/integration.xml b/src/main/resources/integration.xml index 8a2b9fbc..83f7b9af 100644 --- a/src/main/resources/integration.xml +++ b/src/main/resources/integration.xml @@ -79,6 +79,11 @@ + + + + + @@ -89,6 +94,7 @@ + @@ -272,6 +278,23 @@ method="persistUser" /> + + + + + + + + @@ -288,6 +311,7 @@ + @@ -332,6 +356,12 @@ + + + + + + + + + + + + diff --git a/src/main/resources/templates/tickersymbol/all.html b/src/main/resources/templates/tickersymbol/all.html index ee564307..734ec6e2 100644 --- a/src/main/resources/templates/tickersymbol/all.html +++ b/src/main/resources/templates/tickersymbol/all.html @@ -27,7 +27,7 @@ - +
diff --git a/src/test/java/org/woehlke/twitterwall/ScheduledTasksTest.java b/src/test/java/org/woehlke/twitterwall/ScheduledTasksTest.java index 11eed711..6a5ab1d9 100644 --- a/src/test/java/org/woehlke/twitterwall/ScheduledTasksTest.java +++ b/src/test/java/org/woehlke/twitterwall/ScheduledTasksTest.java @@ -36,4 +36,9 @@ public void updateUserProfilesFromMentions() throws Exception { public void fetchUsersFromDefinedUserList() throws Exception { scheduledTasks.fetchUsersFromDefinedUserList(); } + + @Test + public void removeOldDataFromStorage() throws Exception { + scheduledTasks.removeOldDataFromStorage(); + } } diff --git a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AbstractMqEndpointTest.java b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AbstractMqEndpointTest.java index ee57e154..44be841a 100644 --- a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AbstractMqEndpointTest.java +++ b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AbstractMqEndpointTest.java @@ -6,7 +6,6 @@ public abstract class AbstractMqEndpointTest { protected boolean assertCountedEntities(CountedEntities beforeTest, CountedEntities afterTest) { - boolean result = true; boolean resultTask = afterTest.getCountTask() > beforeTest.getCountTask(); boolean resultTaskHistory = afterTest.getCountTaskHistory() > beforeTest.getCountTaskHistory(); @@ -30,7 +29,36 @@ protected boolean assertCountedEntities(CountedEntities beforeTest, CountedEntit Assert.assertTrue(resultMention); Assert.assertTrue(resultTickerSymbol); - result = resultTask && resultTaskHistory && resultTweets && resultUser && resultUrl && resultUrlCache && resultHashTags && resultHashTags && resultMedia && resultMention && resultTickerSymbol; + boolean result = resultTask && resultTaskHistory && resultTweets && resultUser && resultUrl && resultUrlCache && resultHashTags && resultHashTags && resultMedia && resultMention && resultTickerSymbol; + + return result; + } + + protected boolean assertCountedEntitiesReduced(CountedEntities beforeTest, CountedEntities afterTest) { + + boolean resultTask = afterTest.getCountTask() < beforeTest.getCountTask(); + boolean resultTaskHistory = afterTest.getCountTaskHistory() < beforeTest.getCountTaskHistory(); + boolean resultTweets = afterTest.getCountTweets()<=beforeTest.getCountTweets(); + boolean resultUser = afterTest.getCountUser()<=beforeTest.getCountUser(); + boolean resultUrl = afterTest.getCountUrl() <= beforeTest.getCountUrl(); + boolean resultUrlCache = afterTest.getCountUrlCache()<= beforeTest.getCountUrlCache(); + boolean resultHashTags = afterTest.getCountHashTags()<=beforeTest.getCountHashTags(); + boolean resultMedia = afterTest.getCountMedia()<=beforeTest.getCountMedia(); + boolean resultMention = afterTest.getCountMention() <=beforeTest.getCountMention(); + boolean resultTickerSymbol = afterTest.getCountTickerSymbol() <= beforeTest.getCountTickerSymbol(); + + Assert.assertTrue(resultTask); + Assert.assertTrue(resultTaskHistory); + Assert.assertTrue(resultTweets); + Assert.assertTrue(resultUser); + Assert.assertTrue(resultUrl); + Assert.assertTrue(resultUrlCache); + Assert.assertTrue(resultHashTags); + Assert.assertTrue(resultMedia); + Assert.assertTrue(resultMention); + Assert.assertTrue(resultTickerSymbol); + + boolean result = resultTask && resultTaskHistory && resultTweets && resultUser && resultUrl && resultUrlCache && resultHashTags && resultHashTags && resultMedia && resultMention && resultTickerSymbol; return result; } diff --git a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTest.java b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTest.java index 678372d6..843e3980 100644 --- a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTest.java +++ b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTest.java @@ -15,4 +15,6 @@ public interface AsyncStartTaskTest { void fetchTweetsFromSearchTest() throws Exception; void fetchUsersFromListTest() throws Exception; + + void removeOldDataFromStorage()throws Exception; } diff --git a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTestImpl.java b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTestImpl.java index 35901369..dce96105 100644 --- a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTestImpl.java +++ b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/AsyncStartTaskTestImpl.java @@ -1,6 +1,7 @@ package org.woehlke.twitterwall.scheduled.mq.endpoint; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -105,4 +106,23 @@ public void fetchUsersFromListTest() throws Exception { Assert.assertTrue(ok); log.info(msg+"FINISHED TEST"); } + + //TODO: #229 https://github.com/phasenraum2010/twitterwall2/issues/229 + @Ignore + @Test + @Override + public void removeOldDataFromStorage() throws Exception { + String msg = "removeOldDataFromStorage: "; + log.info(msg+"START TEST"); + CountedEntities beforeTest = countedEntitiesService.countAll(); + Task task = this.mqAsyncStartTask.removeOldDataFromStorage(); + log.info(msg+"created Task = "+task.getUniqueId()); + Assert.assertNotNull(task); + Assert.assertNotNull(task.getUniqueId()); + Assert.assertEquals(SendType.FIRE_AND_FORGET,task.getSendType()); + CountedEntities afterTest = countedEntitiesService.countAll(); + boolean ok = assertCountedEntitiesReduced(beforeTest,afterTest); + Assert.assertTrue(ok); + log.info(msg+"FINISHED TEST"); + } } diff --git a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/StartTaskTestImpl.java b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/StartTaskTestImpl.java index ed57cbbf..84eeca45 100644 --- a/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/StartTaskTestImpl.java +++ b/src/test/java/org/woehlke/twitterwall/scheduled/mq/endpoint/StartTaskTestImpl.java @@ -2,6 +2,7 @@ import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -102,6 +103,23 @@ public void fetchUsersFromListTest() throws Exception { log.info(msg+"FINISHED TEST"); } + //TODO: #229 https://github.com/phasenraum2010/twitterwall2/issues/229 + @Ignore + @Test + @Override + public void removeOldDataFromStorage() throws Exception { + String msg = "removeOldDataFromStorage: "; + log.info(msg+"START TEST"); + CountedEntities beforeTest = countedEntitiesService.countAll(); + Task task = this.mqStartTask.removeOldDataFromStorage(); + log.info(msg+"created Task = "+task.getUniqueId()); + Assert.assertEquals(SendType.SEND_AND_WAIT_FOR_RESULT,task.getSendType()); + CountedEntities afterTest = countedEntitiesService.countAll(); + boolean ok = assertCountedEntitiesReduced(beforeTest,afterTest); + Assert.assertTrue(ok); + log.info(msg+"FINISHED TEST"); + } + @Test public void createImprintUserTest() throws Exception { String msg = "createImprintUserTest: ";