Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HWKALERTS-168 Support schema upgrades #216

Merged
merged 2 commits into from
Sep 28, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hawkular-alerts-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<version>${version.org.drools}</version>
</dependency>

<dependency>
<groupId>org.cassalog</groupId>
<artifactId>cassalog</artifactId>
<version>${version.org.cassalog}</version>
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -30,6 +35,8 @@
import javax.ejb.Startup;
import javax.enterprise.inject.Produces;

import org.cassalog.core.Cassalog;
import org.cassalog.core.CassalogBuilder;
import org.hawkular.alerts.engine.util.TokenReplacingReader;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -82,6 +89,7 @@ public class CassCluster {
private static final String ALERTS_CASSANDRA_OVERWRITE = "hawkular-alerts.cassandra-overwrite";
private static final String ALERTS_CASSANDRA_OVERWRITE_ENV = "CASSANDRA_OVERWRITE";

private String keyspace;
private int attempts;
private int timeout;
private String cqlPort;
Expand All @@ -107,71 +115,71 @@ private void readProperties() {
ALERTS_CASSANDRA_READ_TIMEOUT_ENV, String.valueOf(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS)));
overwrite = Boolean.parseBoolean(AlertProperties.getProperty(ALERTS_CASSANDRA_OVERWRITE,
ALERTS_CASSANDRA_OVERWRITE_ENV, "false"));
keyspace = AlertProperties.getProperty(ALERTS_CASSANDRA_KEYSPACE, "hawkular_alerts");
}

@PostConstruct
public void initCassCluster() {
readProperties();
if (cluster == null && session == null) {

int currentAttempts = attempts;
/*
It might happen that alerts component is faster than embedded cassandra deployed in hawkular.
We will provide a simple attempt/retry loop to avoid issues at initialization.
*/
while(session == null && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
try {
SocketOptions socketOptions = null;
if (connTimeout != SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS ||
readTimeout != SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS) {
socketOptions = new SocketOptions();
if (connTimeout != SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS) {
socketOptions.setConnectTimeoutMillis(connTimeout);
}
if (readTimeout != SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS) {
socketOptions.setReadTimeoutMillis(readTimeout);
}
}

Cluster.Builder clusterBuilder = new Cluster.Builder()
.addContactPoints(nodes.split(","))
.withPort(new Integer(cqlPort))
.withProtocolVersion(ProtocolVersion.V3)
.withQueryOptions(new QueryOptions().setRefreshSchemaIntervalMillis(0));
int currentAttempts = attempts;

if (socketOptions != null) {
clusterBuilder.withSocketOptions(socketOptions);
}
SocketOptions socketOptions = null;
if (connTimeout != SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS ||
readTimeout != SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS) {
socketOptions = new SocketOptions();
if (connTimeout != SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS) {
socketOptions.setConnectTimeoutMillis(connTimeout);
}
if (readTimeout != SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS) {
socketOptions.setReadTimeoutMillis(readTimeout);
}
}

cluster = clusterBuilder.build();
session = cluster.connect();
} catch (Exception e) {
log.warn("Could not connect to Cassandra cluster - assuming is not up yet. Cause: " +
((e.getCause() == null) ? e : e.getCause()));
if (attempts == 0) {
throw e;
}
}
if (session == null) {
log.warn("[" + currentAttempts + "] Retrying connecting to Cassandra cluster " +
"in [" + timeout + "]ms...");
currentAttempts--;
try {
Thread.sleep(timeout);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
Cluster.Builder clusterBuilder = new Cluster.Builder()
.addContactPoints(nodes.split(","))
.withPort(new Integer(cqlPort))
.withProtocolVersion(ProtocolVersion.V3)
.withQueryOptions(new QueryOptions().setRefreshSchemaIntervalMillis(0));

if (socketOptions != null) {
clusterBuilder.withSocketOptions(socketOptions);
}

/*
It might happen that alerts component is faster than embedded cassandra deployed in hawkular.
We will provide a simple attempt/retry loop to avoid issues at initialization.
*/
while(session == null && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
try {
cluster = clusterBuilder.build();
session = cluster.connect();
} catch (Exception e) {
log.warn("Could not connect to Cassandra cluster - assuming is not up yet. Cause: " +
((e.getCause() == null) ? e : e.getCause()));
if (attempts == 0) {
throw e;
}
}
if (session != null && !initialized) {
String keyspace = AlertProperties.getProperty(ALERTS_CASSANDRA_KEYSPACE, "hawkular_alerts");
if (session == null) {
log.warn("[" + currentAttempts + "] Retrying connecting to Cassandra cluster " +
"in [" + timeout + "]ms...");
currentAttempts--;
try {
initScheme(session, keyspace, overwrite);
} catch (IOException e) {
log.error("Error on initialization of Alerts scheme", e);
Thread.sleep(timeout);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
if (session != null) {
try {
initScheme();
} catch (IOException e) {
log.error("Error on initialization of Alerts scheme", e);
}
}

if (session == null) {
throw new RuntimeException("Cassandra session is null");
}
Expand All @@ -180,19 +188,18 @@ public void initCassCluster() {
}
}

private void initScheme(Session session, String keyspace, boolean overwrite) throws IOException {
private void initScheme() throws IOException {

if (log.isDebugEnabled()) {
log.debug("Checking Schema existence for keyspace: " + keyspace);
log.debugf("Checking Schema existence for keyspace: %s", keyspace);
}

KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace);
if (keyspaceMetadata != null) {
if (overwrite) {
session.execute("DROP KEYSPACE " + keyspace);
} else {
// If overwrite flag is true it should not check if all tables are created
if (!overwrite) {
int currentAttempts = attempts;
while(!checkSchema(keyspace) && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
while(!checkSchema() && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
log.warn("[" + currentAttempts + "] Keyspace detected but schema not fully created. " +
"Retrying in [" + timeout + "]ms...");
currentAttempts--;
Expand All @@ -202,8 +209,9 @@ private void initScheme(Session session, String keyspace, boolean overwrite) thr
Thread.currentThread().interrupt();
}
}
if (!checkSchema(keyspace)) {
if (!checkSchema()) {
log.errorf("Keyspace detected, but failed on check phase.", keyspace);
initialized = false;
return;
}
log.debug("Schema already exist. Skipping schema creation.");
Expand All @@ -214,41 +222,25 @@ private void initScheme(Session session, String keyspace, boolean overwrite) thr

log.infof("Creating Schema for keyspace %s", keyspace);

ImmutableMap<String, String> schemaVars = ImmutableMap.of("keyspace", keyspace);
createSchema(session, keyspace, overwrite);

String updatedCQL = null;
try (InputStream isSchema = CassCluster.class.getResourceAsStream("/hawkular-alerts-schema.cql");
InputStreamReader readerSchema = new InputStreamReader(isSchema);) {
String content = CharStreams.toString(readerSchema);
for (String cql : content.split("(?m)^-- #.*$")) {
if (!cql.startsWith("--")) {
updatedCQL = substituteVars(cql.trim(), schemaVars);
if (log.isDebugEnabled()) {
log.debug("Executing CQL:\n" + updatedCQL + "\n");
}
session.execute(updatedCQL);
}
}
} catch (Exception e) {
log.errorf("Failed schema creation: %s\nEXECUTING CQL:\n%s", e, updatedCQL);
}
initialized = true;

log.infof("Done creating Schema for keyspace: " + keyspace);
log.infof("Done creating Schema for keyspace: %s", keyspace);
}

private boolean checkSchema(String keyspace) {
private boolean checkSchema() {
ImmutableMap<String, String> schemaVars = ImmutableMap.of("keyspace", keyspace);

String updatedCQL = null;
try (InputStream isChecker = CassCluster.class.getResourceAsStream("/hawkular-alerts-checker.cql");
try (InputStream isChecker = CassCluster.class.getResourceAsStream("/org/hawkular/alerts/schema/checker.cql");
InputStreamReader readerChecker = new InputStreamReader(isChecker);) {
String content = CharStreams.toString(readerChecker);
for (String cql : content.split("(?m)^-- #.*$")) {
if (!cql.startsWith("--")) {
updatedCQL = substituteVars(cql.trim(), schemaVars);
if (log.isDebugEnabled()) {
log.debug("Checking CQL:\n" + updatedCQL + "\n");
log.debugf("Checking CQL:\n %s \n",updatedCQL);
}
ResultSet rs = session.execute(updatedCQL);
if (rs.isExhausted()) {
Expand Down Expand Up @@ -278,6 +270,48 @@ private String substituteVars(String cql, Map<String, String> vars) {
}
}

private URI getCassalogScript() {
try {
return getClass().getResource("/org/hawkular/alerts/schema/cassalog.groovy").toURI();
} catch (URISyntaxException e) {
throw new RuntimeException("Failed to load schema change script", e);
}
}

private String getNewHawkularAlertingVersion() {
try {
Enumeration<URL> resources = getClass().getClassLoader().getResources("META-INF/MANIFEST.MF");
while (resources.hasMoreElements()) {
URL resource = resources.nextElement();
Manifest manifest = new Manifest(resource.openStream());
String vendorId = manifest.getMainAttributes().getValue("Implementation-Vendor-Id");
if (vendorId != null && vendorId.equals("org.hawkular.alerts")) {
return manifest.getMainAttributes().getValue("Implementation-Version");
}
}
throw new RuntimeException("Unable to determine implementation version for Hawkular Alerting");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void createSchema(Session session, String keyspace, boolean resetDB) {

CassalogBuilder builder = new CassalogBuilder();
Cassalog cassalog = builder.withKeyspace(keyspace).withSession(session).build();
Map<String, ?> vars = ImmutableMap.of(
"keyspace", keyspace,
"reset", resetDB,
"session", session
);
// List of versions of alerting
URI script = getCassalogScript();
cassalog.execute(script, vars);

session.execute("INSERT INTO " + keyspace + ".sys_config (config_id, name, value) VALUES " +
"('org.hawkular.alerts', 'version', '" + getNewHawkularAlertingVersion() + "')");
}

@Produces
@CassClusterSession
/*
Expand Down
Loading