Skip to content

Commit

Permalink
Stage 1 (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
ImLena committed Feb 18, 2024
1 parent e22da0c commit 8584357
Show file tree
Hide file tree
Showing 18 changed files with 1,944 additions and 0 deletions.
86 changes: 86 additions & 0 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/HttpServerImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package ru.vk.itmo.test.elenakhodosova;

import one.nio.http.*;
import one.nio.server.AcceptorConfig;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.elenakhodosova.dao.ReferenceDao;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;

public class HttpServerImpl extends HttpServer {

private final ReferenceDao dao;

public HttpServerImpl(ServiceConfig config, ReferenceDao dao) throws IOException {
super(createServerConfig(config));
this.dao = dao;
}

private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig) {
HttpServerConfig httpServerConfig = new HttpServerConfig();
AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
httpServerConfig.closeSessions = true;
return httpServerConfig;
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_GET)
public Response getEntity(@Param(value = "id", required = true) String id) {
if (isParamIncorrect(id)) return new Response(Response.BAD_REQUEST, Response.EMPTY);
Entry<MemorySegment> value = dao.get(MemorySegment.ofArray(id.toCharArray()));
return value == null ? new Response(Response.NOT_FOUND, Response.EMPTY)
: Response.ok(value.value().toArray(ValueLayout.JAVA_BYTE));
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_PUT)
public Response putEntity(@Param(value = "id", required = true) String id, Request request) {
byte[] value = request.getBody();
if (isParamIncorrect(id)) return new Response(Response.BAD_REQUEST, Response.EMPTY);
dao.upsert(new BaseEntry<>(
MemorySegment.ofArray(id.toCharArray()),
MemorySegment.ofArray(value)));
return new Response(Response.CREATED, Response.EMPTY);
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_DELETE)
public Response deleteEntity(@Param(value = "id", required = true) String id) {
if (isParamIncorrect(id)) return new Response(Response.BAD_REQUEST, Response.EMPTY);
dao.upsert(new BaseEntry<>(MemorySegment.ofArray(id.toCharArray()), null));
return new Response(Response.ACCEPTED, Response.EMPTY);
}

@Path("/v0/entity")
public Response methodNotSupported() {
return new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);
}

@Override
public void handleDefault(Request request, HttpSession session) throws IOException {
Response badRequest = new Response(Response.BAD_REQUEST, Response.EMPTY);
session.sendResponse(badRequest);
}

@Override
public synchronized void stop() {
try {
dao.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
super.stop();
}

private boolean isParamIncorrect(String param) {
return param == null || param.isEmpty();
}
}
26 changes: 26 additions & 0 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/Server.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ru.vk.itmo.test.elenakhodosova;

import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.elenakhodosova.dao.ReferenceDao;

import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

public class Server {
public static final long FLUSH_THRESHOLD_BYTES = 4 * 1024 * 1024;
public static void main(String[] args) throws IOException {
ReferenceDao dao;
ServiceConfig config = new ServiceConfig(
8080,
"http://localhost",
List.of("http://localhost"),
Files.createTempDirectory(".")
);

dao = new ReferenceDao(new Config(config.workingDir(), FLUSH_THRESHOLD_BYTES));
HttpServerImpl server = new HttpServerImpl(config, dao);
server.start();
}
}
48 changes: 48 additions & 0 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/ServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.vk.itmo.test.elenakhodosova;

import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.ServiceFactory;
import ru.vk.itmo.test.elenakhodosova.dao.ReferenceDao;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;


public class ServiceImpl implements Service {

private HttpServerImpl server;
private ReferenceDao dao;
private final ServiceConfig config;
public static final long FLUSH_THRESHOLD_BYTES = 4 * 1024 * 1024;

public ServiceImpl(ServiceConfig config) {
this.config = config;

}

@Override
public CompletableFuture<Void> start() throws IOException {
dao = new ReferenceDao(new Config(config.workingDir(), FLUSH_THRESHOLD_BYTES));
this.server = new HttpServerImpl(config, dao);
server.start();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stop() throws IOException {
server.stop();
dao.close();
return CompletableFuture.completedFuture(null);
}

@ServiceFactory(stage = 1)
public static class Factory implements ServiceFactory.Factory {

@Override
public Service create(ServiceConfig config) {
return new ServiceImpl(config);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

/**
* Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface.
*
* @author incubos
*/
final class ByteArraySegment {
private byte[] array;
private MemorySegment segment;

ByteArraySegment(final int capacity) {
this.array = new byte[capacity];
this.segment = MemorySegment.ofArray(array);
}

void withArray(final ArrayConsumer consumer) throws IOException {
consumer.process(array);
}

MemorySegment segment() {
return segment;
}

void ensureCapacity(final long size) {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too big!");
}

final int capacity = (int) size;
if (array.length >= capacity) {
return;
}

// Grow to the nearest bigger power of 2
final int newSize = Integer.highestOneBit(capacity) << 1;
array = new byte[newSize];
segment = MemorySegment.ofArray(array);
}

interface ArrayConsumer {
void process(byte[] array) throws IOException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Filters non tombstone {@link Entry}s.
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
}
}
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
next = null;

skipTombstones();

return result;
}
}
49 changes: 49 additions & 0 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/MemTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Memory table.
*
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
// All
return map.values().iterator();
} else if (from == null) {
// Head
return map.headMap(to).values().iterator();
} else if (to == null) {
// Tail
return map.tailMap(from).values().iterator();
} else {
// Slice
return map.subMap(from, to).values().iterator();
}
}

Entry<MemorySegment> get(final MemorySegment key) {
return map.get(key);
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
return map.put(entry.key(), entry);
}
}
Loading

0 comments on commit 8584357

Please sign in to comment.