广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Springboot整合HBase
  • 852
分享到

Springboot整合HBase

springbootjava后端 2023-09-05 06:09:20 852人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

SpringBoot整合HBase数据库 1、添加依赖 com.spring4all spring-boot-starter-hbase org.springframework.d

SpringBoot整合HBase数据库

1、添加依赖
<dependency>    <groupId>com.spring4allgroupId>    <artifactId>spring-boot-starter-hbaseartifactId>dependency><dependency>    <groupId>org.springframework.datagroupId>    <artifactId>spring-data-hadoop-hbaseartifactId>    <version>2.5.0.RELEASEversion>dependency><dependency>    <groupId>org.springframework.datagroupId>    <artifactId>spring-data-hadoopartifactId>    <version>2.5.0.RELEASEversion>dependency>
2、添加配置
通过Yaml方式配置
spring:  hbase:     ZooKeeper:      quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org      property:         clientPort: 2181  data:    hbase:      quorum: XXX      rootDir: XXX      nodeParent: XXXzookeeper:  znode:    parent: /hbase
3、添加配置类
@Configurationpublic class HBaseConfig {    @Bean    public HBaseService getHbaseService() {        //设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具windows连接hadoop时会用到        //System.setProperty("hadoop.home.dir", "D:\\Program Files\\Hadoop");        //执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();        return new HBaseService(conf);    }}
4、工具类的方式实现HBASE操作
@Servicepublic class HBaseService {    private Admin admin = null;    private Connection connection = null;    public HBaseService(Configuration conf) {        connection = ConnectionFactory.createConnection(conf);            admin = connection.getAdmin();    }    //创建表 create , {NAME => , VERSIONS => }    public boolean creatTable(String tableName, List<String> columnFamily) {        //列族column family        List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());        columnFamily.forEach(cf -> {            cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(                Bytes.toBytes(cf)).build());        });        //表 table        TableDescriptor tableDesc = TableDescriptorBuilder            .newBuilder(TableName.valueOf(tableName))            .setColumnFamilies(cfDesc).build();        if (admin.tableExists(TableName.valueOf(tableName))) {            log.debug("table Exists!");        } else {            admin.createTable(tableDesc);            log.debug("create table Success!");        }        close(admin, null, null);        return true;    }    public List<String> getAllTableNames() {        List<String> result = new ArrayList<>();        TableName[] tableNames = admin.listTableNames();        for (TableName tableName : tableNames) {            result.add(tableName.getNameAsString());        }        close(admin, null, null);        return result;    }    public Map<String, Map<String, String>> getResultScanner(String tableName) {        Scan scan = new Scan();        return this.queryData(tableName, scan);    }    private Map<String, Map<String, String>> queryData(String tableName, Scan scan) {        //         Map<String, Map<String, String>> result = new HashMap<>();        ResultScanner rs = null;        //获取表        Table table = null;        table = getTable(tableName);        rs = table.getScanner(scan);        for (Result r : rs) {            // 每一行数据            Map<String, String> columnMap = new HashMap<>();            String rowKey = null;            // 行键,列族和列限定符一起确定一个单元(Cell)            for (Cell cell : r.listCells()) {                if (rowKey == null) {                    rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());                }                columnMap.put(                    //列限定符                    Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),                    //列族                    Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));            }            if (rowKey != null) {                result.put(rowKey, columnMap);            }        }        close(null, rs, table);        return result;    }    public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {        Table table = null;        table = getTable(tableName);        putData(table, rowKey, tableName, familyName, columns, values);        close(null, null, table);    }    private void putData(Table table, String rowKey, String tableName,                          String familyName, String[] columns, String[] values) {        //设置rowkey        Put put = new Put(Bytes.toBytes(rowKey));        if (columns != null && values != null && columns.length == values.length) {            for (int i = 0; i < columns.length; i++) {                if (columns[i] != null && values[i] != null) {                    put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));                } else {                    throw new NullPointerException(MessageFORMat.format(                        "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));                }            }        }        table.put(put);        log.debug("putData add or update data Success,rowKey:" + rowKey);        table.close();    }    private Table getTable(String tableName) throws IOException {        return connection.getTable(TableName.valueOf(tableName));    }    private void close(Admin admin, ResultScanner rs, Table table) {        if (admin != null) {            try {                admin.close();            } catch (IOException e) {                log.error("关闭Admin失败", e);            }            if (rs != null) {                rs.close();            }            if (table != null) {                rs.close();            }            if (table != null) {                try {                    table.close();                } catch (IOException e) {                    log.error("关闭Table失败", e);                }            }        }    }} 
测试
@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestclass HBaseApplicationTests {    @Resource    private HBaseService hbaseService;    //测试创建表    @Test    public void testCreateTable() {        hbaseService.creatTable("test_base", Arrays.asList("a", "back"));    }    //测试加入数据    @Test    public void testPutData() {        hbaseService.putData("test_base", "000001", "a", new String[]{                "project_id", "varName", "coefs", "pvalues", "tvalues",                "create_time"}, new String[]{"40866", "mob_3", "0.9416",                "0.0000", "12.2293", "null"});        hbaseService.putData("test_base", "000002", "a", new String[]{                "project_id", "varName", "coefs", "pvalues", "tvalues",                "create_time"}, new String[]{"40866", "idno_prov", "0.9317",                "0.0000", "9.8679", "null"});        hbaseService.putData("test_base", "000003", "a", new String[]{                "project_id", "varName", "coefs", "pvalues", "tvalues",                "create_time"}, new String[]{"40866", "education", "0.8984",                "0.0000", "25.5649", "null"});    }    //测试遍历全表    @Test    public void testGetResultScanner() {        Map<String, Map<String, String>> result2 = hbaseService.getResultScanner("test_base");        System.out.println("-----遍历查询全表内容-----");        result2.forEach((k, value) -> {            System.out.println(k + "--->" + value);        });    }}

三、使用spring-data-hadoop-hbase

3、配置类
@Configurationpublic class HBaseConfiguration {     @Value("${hbase.zookeeper.quorum}")    private String zookeeperQuorum;     @Value("${hbase.zookeeper.property.clientPort}")    private String clientPort;     @Value("${zookeeper.znode.parent}")    private String znodeParent;     @Bean    public HbaseTemplate hbaseTemplate() {        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);        conf.set("hbase.zookeeper.property.clientPort", clientPort);        conf.set("zookeeper.znode.parent", znodeParent);        return new HbaseTemplate(conf);    }}
4、业务类中使用HbaseTemplate

这个是作为工具类

@Service@Slf4jpublic class HBaseService {      @Autowired    private HbaseTemplate hbaseTemplate;  //查询列簇    public List<Result> getRowKeyAndColumn(String tableName, String startRowkey,                String stopRowkey, String column, String qualifier) {        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);        if (StringUtils.isNotBlank(column)) {            log.debug("{}", column);            filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL,                       new BinaryComparator(Bytes.toBytes(column))));        }        if (StringUtils.isNotBlank(qualifier)) {            log.debug("{}", qualifier);            filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL,                        new BinaryComparator(Bytes.toBytes(qualifier))));        }        Scan scan = new Scan();        if (filterList.getFilters().size() > 0) {            scan.setFilter(filterList);        }        scan.setStartRow(Bytes.toBytes(startRowkey));        scan.setStopRow(Bytes.toBytes(stopRowkey));         return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);    }     public List<Result> getListRowkeyData(String tableName, List<String> rowKeys,               String familyColumn, String column) {        return rowKeys.stream().map(rk -> {            if (StringUtils.isNotBlank(familyColumn)) {                if (StringUtils.isNotBlank(column)) {                    return hbaseTemplate.get(tableName, rk, familyColumn,     column, (rowMapper, rowNum) -> rowMapper);                } else {                    return hbaseTemplate.get(tableName, rk, familyColumn,    (rowMapper, rowNum) -> rowMapper);                }            }            return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper);        }).collect(Collectors.toList());    }}

四、使用spring-boot-starter-data-hbase

参考:https://blog.csdn.net/cponGo1/article/details/89550486

## 下载spring-boot-starter-hbase代码git clone Https://GitHub.com/SpringForAll/spring-boot-starter-hbase.git## 安装cd spring-boot-starter-hbasemvn clean install
2、添加配置项
  • spring.data.hbase.quorum 指定 HBase 的 zk 地址
  • spring.data.hbase.rootDir 指定 HBase 在 hdfs 上存储的路径
  • spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
3、定义好DTO
@Datapublic class City {    private Long id;    private Integer age;    private String cityName;  }
4、创建对应rowMapper
public class CityRowMapper implements RowMapper<City> {     private static byte[] COLUMN_FAMILY = "f".getBytes();    private static byte[] NAME = "name".getBytes();    private static byte[] AGE = "age".getBytes();     @Override    public City mapRow(Result result, int rowNum) throws Exception {        String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME));        int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE));         City dto = new City();        dto.setCityName(name);        dto.setAge(age);        return dto;    }}
5、操作实现增改查
  • HbaseTemplate.find 返回 HBase 映射的 City 列表
  • HbaseTemplate.get 返回 row 对应的 City 信息
  • HbaseTemplate.saveOrUpdates 保存或者更新
    如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能
@Servicepublic class CityServiceImpl implements CityService {     @Autowired private HbaseTemplate hbaseTemplate; //查询    public List<City> query(String startRow, String stopRow) {        Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));        scan.setCaching(5000);        List<City> dtos = this.hbaseTemplate.find("people_table", scan, new CityRowMapper());        return dtos;    } //查询    public City query(String row) {        City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper());        return dto;    } //新增或者更新    public void saveOrUpdate() {        List<Mutation> saveOrUpdates = new ArrayList<Mutation>();        Put            put           = new Put(Bytes.toBytes("135xxxxxx"));        put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test"));        saveOrUpdates.add(put);        this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates);    }}

Springboot整合Influxdb

中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html

注意,项目建立在spring-boot-WEB基础上

1、添加依赖
<dependency>    <groupId>org.influxdbgroupId>    <artifactId>influxdb-javaartifactId>    <version>2.15version>dependency>
2、添加配置
spring:  influx:    database: my_sensor1    passWord: admin    url: http://127.0.0.1:6086    user: admin
3、编写配置类
@Configurationpublic class InfluxdbConfig {            @Value("${spring.influx.url}")    private String influxDBUrl;     @Value("${spring.influx.user}")    private String userName;        @Value("${spring.influx.password}")    private String password;        @Value("${spring.influx.database}")    private String database;        @Bean("influxDB")    public InfluxDB influxdb(){             InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);        try {                                    influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);                    } catch (Exception e) {             e.printStackTrace();        } finally {             //设置默认策略            influxDB.setRetentionPolicy("sensor_retention");            }        //设置日志输出级别        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);          return influxDB;    }}
4、InfluxDB原生api实现
@SpringBootTest(classes = {MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)public class InfluxdbDBTest {    @Autowired    private InfluxDB influxDB;        //measurement    private final String measurement = "sensor";        @Value("${spring.influx.database}")    private String database;            @Test    public void insert(){        List<String> lines = new ArrayList<String>();               Point point = null;             for(int i=0;i<50;i++){                      point = Point.measurement(measurement)            .tag("deviceId", "sensor" + i)            .addField("temp", 3)            .addField("voltage", 145+i)            .addField("A1", "4i")            .addField("A2", "4i").build();            lines.add(point.lineProtocol());        }        //写入        influxDB.write(lines);    }            @Test    public void batchInsert(){        BatchPoints batchPoints = BatchPoints                .database(database)                .consistency(InfluxDB.ConsistencyLevel.ALL)                .build();      //遍历sqlserver获取数据      for(int i=0;i<50;i++){        //创建单条数据对象——表名        Point point = Point.measurement(measurement)          //tag属性——只能存储String类型                .tag("deviceId", "sensor" + i)                .addField("temp", 3)                .addField("voltage", 145+i)                .addField("A1", "4i")                .addField("A2", "4i").build();        //将单条数据存储到集合        batchPoints.point(point);      }      //批量插入      influxDB.write(batchPoints);     }            @Test    public void datas(@RequestParam Integer page){        int pageSize = 10;        // InfluxDB支持分页查询,因此可以设置分页查询条件        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;                String queryCondition = "";  //查询条件暂且为空        // 此处查询所有内容,如果        String queryCmd = "SELECT * FROM "            // 查询指定设备下的日志信息            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;            // + 策略name + "." + measurement            + measurement            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)            + queryCondition            // 查询结果需要按照时间排序            + " ORDER BY time DESC"            // 添加分页查询条件            + pageQuery;                QueryResult queryResult = influxDB.query(new Query(queryCmd, database));        System.out.println("query result => "+queryResult);    }}
5、采用封装工具类
1、创建实体类
@Data@Measurement(name = "sensor")public class Sensor {    @Column(name="deviceId",tag=true)    private String deviceId;        @Column(name="temp")    private float temp;        @Column(name="voltage")    private float voltage;        @Column(name="A1")    private float A1;        @Column(name="A2")    private float A2;        @Column(name="time")    private String time;        }
2、创建工具类
@Componentpublic class InfluxdbUtils {    @Autowired    private InfluxDB influxDB;        @Value("${spring.influx.database}")    private String database;                @SneakyThrows    public void insertOne(Object obj){        //获取度量        Class<?> clasz = obj.getClass();        Measurement measurement = clasz.getAnnotation(Measurement.class);        //构建        Point.Builder builder = Point.measurement(measurement.name());        // 获取对象属性        Field[] fieldArray = clasz.getDeclaredFields();        Column column = null;        for(Field field : fieldArray){                column = field.getAnnotation(Column.class);                //设置属性可操作                field.setAccessible(true);                 if(column.tag()){                    //tag属性只能存储String类型                    builder.tag(column.name(), field.get(obj).toString());                }else{                    //设置field                    if(field.get(obj) != null){                        builder.addField(column.name(), field.get(obj).toString());                    }                }        }        influxDB.write(builder.build());    }            @SneakyThrows    public void insertBatchByRecords(List<?> records){        List<String> lines = new ArrayList<String>();           records.forEach(record->{            Class<?> clasz = record.getClass();            //获取度量            Measurement measurement = clasz.getAnnotation(Measurement.class);            //构建            Point.Builder builder = Point.measurement(measurement.name());            Field[] fieldArray = clasz.getDeclaredFields();            Column column = null;            for(Field field : fieldArray){                    column = field.getAnnotation(Column.class);                    //设置属性可操作                    field.setAccessible(true);                     if(column.tag()){                        //tag属性只能存储String类型                        builder.tag(column.name(), field.get(record).toString());                    }else{                        //设置field                        if(field.get(record) != null){builder.addField(column.name(), field.get(record).toString());                        }                    }            }            lines.add(builder.build().lineProtocol());        });        influxDB.write(lines);    }            @SneakyThrows    public void insertBatchByPoints(List<?> records){        BatchPoints batchPoints = BatchPoints.database(database)                .consistency(InfluxDB.ConsistencyLevel.ALL)                .build();        records.forEach(record->{            Class<?> clasz = record.getClass();            //获取度量            Measurement measurement = clasz.getAnnotation(Measurement.class);            //构建            Point.Builder builder = Point.measurement(measurement.name());            Field[] fieldArray = clasz.getDeclaredFields();            Column column = null;            for(Field field : fieldArray){                    column = field.getAnnotation(Column.class);                    //设置属性可操作                    field.setAccessible(true);                     if(column.tag()){                        //tag属性只能存储String类型                        builder.tag(column.name(), field.get(record).toString());                    }else{                        //设置field                        if(field.get(record) != null){builder.addField(column.name(), field.get(record).toString());                        }                    }            }            batchPoints.point(builder.build());        });        influxDB.write(batchPoints);    }            public List<Object> fetchRecords(String query){        List<Object> results = new ArrayList<Object>();        QueryResult queryResult = influxDB.query(new Query(query, database));        queryResult.getResults().forEach(result->{            result.getSeries().forEach(serial->{                List<String> columns = serial.getColumns();                int fieldSize = columns.size();                serial.getValues().forEach(value->{                         Map<String,Object> obj = new HashMap<String,Object>();                    for(int i=0;i<fieldSize;i++){                           obj.put(columns.get(i), value.get(i));                    }                    results.add(obj);                });            });        });        return results;    }            public List<Object> fetchRecords(String fieldKeys, String measurement){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);             return this.fetchRecords(query.toString());    }            public List<Object> fetchRecords(String fieldKeys, String measurement, String order){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);        query.append(" order by ").append(order);               return this.fetchRecords(query.toString());    }            public List<Object> fetchRecords(String fieldKeys, String measurement, String order, String limit){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);        query.append(" order by ").append(order);        query.append(limit);        return this.fetchRecords(query.toString());    }            @SneakyThrows    public <T> List<T> fetchResults(String query, Class<?> clasz){        List results = new ArrayList<>();        QueryResult queryResult = influxDB.query(new Query(query, database));        queryResult.getResults().forEach(result->{            result.getSeries().forEach(serial->{                List<String> columns = serial.getColumns();                int fieldSize = columns.size();                     serial.getValues().forEach(value->{                     Object obj = null;                        obj = clasz.newInstance();                        for(int i=0;i<fieldSize;i++){   String fieldName = columns.get(i);Field field = clasz.getDeclaredField(fieldName);field.setAccessible(true);Class<?> type = field.getType();if(type == float.class){    field.set(obj, Float.valueOf(value.get(i).toString()));}else{    field.set(obj, value.get(i));}                       }                    results.add(obj);                });            });        });        return results;    }            public <T> List<T> fetchResults(String fieldKeys, String measurement, Class<?> clasz){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);             return this.fetchResults(query.toString(), clasz);    }            public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, Class<?> clasz){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);        query.append(" order by ").append(order);        return this.fetchResults(query.toString(), clasz);    }            public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, String limit, Class<?> clasz){        StringBuilder query = new StringBuilder();        query.append("select ").append(fieldKeys).append(" from ").append(measurement);        query.append(" order by ").append(order);        query.append(limit);                return this.fetchResults(query.toString(), clasz);    }}
3、使用工具类的测试代码
@SpringBootTest(classes = {MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)public class InfluxdbUtilTest {    @Autowired    private InfluxdbUtils influxdbUtils;            @Test    public void insert(){      Sensor sensor = new Sensor();      sensor.setA1(10);      sensor.setA2(10);      sensor.setDeviceId("0002");      sensor.setTemp(10L);      sensor.setTime("2021-01-19");      sensor.setVoltage(10);      influxdbUtils.insertOne(sensor);    }            @GetMapping("/index22")    public void batchInsert(){          List<Sensor> sensorList = new ArrayList<Sensor>();        for(int i=0; i<50; i++){            Sensor sensor = new Sensor();            sensor.setA1(2);            sensor.setA2(12);            sensor.setTemp(9);            sensor.setVoltage(12);            sensor.setDeviceId("sensor4545-"+i);            sensorList.add(sensor);        }        influxdbUtils.insertBatchByRecords(sensorList);    }            @GetMapping("/index23")    public void batchInsert1(){         List<Sensor> sensorList = new ArrayList<Sensor>();        Sensor sensor = null;        for(int i=0; i<50; i++){            sensor = new Sensor();            sensor.setA1(2);            sensor.setA2(12);            sensor.setTemp(9);            sensor.setVoltage(12);            sensor.setDeviceId("sensor4545-"+i);            sensorList.add(sensor);        }        influxdbUtils.insertBatchByPoints(sensorList);    }                @GetMapping("/datas2")    public void datas(@RequestParam Integer page){        int pageSize = 10;        // InfluxDB支持分页查询,因此可以设置分页查询条件        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;                String queryCondition = "";  //查询条件暂且为空        // 此处查询所有内容,如果        String queryCmd = "SELECT * FROM sensor"            // 查询指定设备下的日志信息            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;            // + 策略name + "." + measurement            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)            + queryCondition            // 查询结果需要按照时间排序            + " ORDER BY time DESC"            // 添加分页查询条件            + pageQuery;                List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);        System.out.println("query result => {}"+sensorList );    }            @GetMapping("/datas21")    public void datas1(@RequestParam Integer page){        int pageSize = 10;        // InfluxDB支持分页查询,因此可以设置分页查询条件        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;                String queryCondition = "";  //查询条件暂且为空        // 此处查询所有内容,如果        String queryCmd = "SELECT * FROM sensor"            // 查询指定设备下的日志信息            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;            // + 策略name + "." + measurement            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)            + queryCondition            // 查询结果需要按照时间排序            + " ORDER BY time DESC"            // 添加分页查询条件            + pageQuery;        List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);        //List sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);        sensorList.forEach(sensor->{            System.out.println("query result => {}"+sensorList );        });         }}
6、采用封装数据模型的方式
1、在Influxdb库中创建存储策略
CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
2、创建数据模型
@Data@Measurement(name = "m_order_payment",database = "db_order", retentionPolicy = "rp_order_payment")public class OrderPayment implements Serializable  {    // 统计批次    @Column(name = "batch_id", tag = true)    private String batchId;    // 哪个BU    @Column(name = "bu_id", tag = true)    private String buId;    // BU 名称    @Column(name = "bu_name")    private String buName;    // 总数    @Column(name = "total_count", tag = true)    private String totalCount;    // 支付量    @Column(name = "pay_count", tag = true)    private String payCount;    // 金额    @Column(name = "total_money", tag = true)    private String totalMoney;}
3、创建Mapper
public class InfluxMapper extends InfluxDBMapper {    public InfluxMapper(InfluxDB influxDB) {        super(influxDB);    }}
4、配置Mapper
@Log4j2@Configurationpublic class InfluxAutoConfiguration {    @Bean    public InfluxMapper influxMapper(InfluxDB influxDB) {        InfluxMapper influxMapper = new InfluxMapper(influxDB);        return influxMapper;    }}
5、测试CRUD
@SpringBootTest(classes = {MainApplication.class})@RunWith(SpringJUnit4ClassRunner.class)public class InfluxdbMapperTest {    @Autowired    private InfluxMapper influxMapper;    @Test    public void save(OrderPayment product) {        influxMapper.save(product);    }    @Test    public void queryAll() {        List<OrderPayment> products = influxMapper.query(OrderPayment.class);        System.out.println(products);    }    @Test    public void queryByBu(String bu) {        String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu);        Query query = new Query(sql, "db_order");        List<OrderPayment> products = influxMapper.query(query, OrderPayment.class);        System.out.println(products);    }}

参考:https://blog.csdn.net/cpongo1/article/details/89550486

https://github.com/SpringForAll/spring-boot-starter-hbase

https://github.com/JeffLi1993/springboot-learning-example

来源地址:https://blog.csdn.net/QingChunBuSanChang/article/details/132596960

--结束END--

本文标题: Springboot整合HBase

本文链接: https://www.lsjlt.com/news/394777.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Springboot整合HBase
    Springboot整合HBase数据库 1、添加依赖 com.spring4all spring-boot-starter-hbase org.springframework.d...
    99+
    2023-09-05
    spring boot java 后端
  • 聊聊springboot 整合 hbase的问题
    springboot 整合 hbase 要确定这三个端口外包可以访问 如果是127.0.0.1 可以参考修改 Linux下Hbase安装配置 <property> ...
    99+
    2022-11-12
  • springboot 整合hbase的示例代码
    目录前言HBase 定义HBase 数据模型物理存储结构数据模型1、Name Space2、Region3、Row4、Column5、Time Stamp6、Cell搭建步骤1、官网...
    99+
    2022-11-13
  • hive与hbase整合
    -- hbase shell filter -- create 'test1', 'lf', 'sf'   &nb...
    99+
    2022-10-18
  • SpringBoot整合MybatisPlus
    文章目录 前言一、MybatisPlus是什么?二、使用步骤1.导入依赖2.编写配置文件3.编写Controller和实体类4.编写持久层接口mapper5.启动类加包扫描注解6.测试 总...
    99+
    2023-10-02
    spring boot mybatis java
  • springboot 整合netty
    增加netty依赖 io.netty netty-all 4.1.28.Final 创建netty init 类 public class WsServerIn...
    99+
    2023-10-08
    spring boot java 后端
  • SpringBoot整合Redisson
    Redisson官方文档: https://github.com/redisson/redisson/wiki 简介:Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Gri...
    99+
    2023-10-26
    spring boot java 后端 redis
  • springboot整合mongodb
    目录1.MongoDB的安装和简介1.1简介1.2安装1.3配置环境变量和检查安装情况2.springboot集成mongodb2.1pom文件中maven的引入2.2properties文件配置2.3dao层的编写2....
    99+
    2023-03-31
    Java Springboot整合mongodb mongodb的安装使用
  • SpringBoot整合Nacos
    SpringBoot整合Nacos 文章目录 SpringBoot整合Nacos一、准备工作1、Nacos服务2、添加依赖 二、注册中心三、配置中心1、配置管理2、命名空间 ...
    99+
    2023-09-05
    spring boot java 服务发现
  • SpringBoot整合Feign
    目录 1、Feign-简介2、spring-cloud快速整合OpenFeign3、Feign日志4、Feign契约配置5、Feign配置超时时间6、Feign拦截器7、Feign断路器 ...
    99+
    2023-09-14
    spring java 后端
  • springboot 整合JDBC
    前提:配置数据库连接(见前面) 一、步骤 导包 org.springframework.boot spring-boot-starter-jdbc 操作 @Autowired private Jdb...
    99+
    2018-12-24
    springboot 整合JDBC 数据库入门 数据库基础教程 数据库 mysql
  • springboot整合Shiro
    目录什么是ShiroShiro的三大核心概念Shiro功能介绍Springboot整合Shiro导入依赖javaConfigRealmControllerShiro整合thymele...
    99+
    2022-11-12
  • SpringBoot整合SpringDataJPA
    目录SpringBoot整合JPAJPA & Spring Data JPAHibernate & JPA1、JPA2、JPA & Hibernate 关系H...
    99+
    2022-11-12
  • 【SpringBoot整合JWT】
    目录 一、什么是JWT 二、JWT能做什么  三、为什么是JWT  1、基于传统的Session认证 2、基于JWT认证 四、JWT的结构是什么  五、JWT的第一个程序 六、封装JWT工具类  七、整合SpringBoot使用 一、什...
    99+
    2023-09-01
    spring boot 后端 java
  • springboot整合sse
    链接: SpringBoot 实现SSE 服务器发送事件 链接: SpringBoot 实现SSE 服务器发送事件 链接: Springboot之整合SSE实现消息推送 链接: springboot ...
    99+
    2023-09-01
    spring boot 服务器 java
  • SpringBoot 整合knife4j
    文章目录 SpringBoot 整合knife4j引入knife4j注解案例knife4j增强功能接口添加作者资源屏蔽访问页面加权控制接口排序分组排序请求参数缓存过滤请求参数禁用调试禁用搜索框 SpringBoot 整...
    99+
    2023-08-21
    spring boot java spring
  • Springboot整合knife4j
    本文介绍knife4j的用法,如何整合到springboot项目中 文章目录 前言环境搭建基本配置常用注解测试 前言 参考文档: 官方文档版本问题文档注解 在项目开发中,自测和联调时,一篇详细通用的接口文档显得尤为重要,不...
    99+
    2023-08-18
    spring boot java spring
  • SpringBoot整合Redis
    SpringBoot中的Redis 在 SpringBoot2.x 之后,原来使用的jedis被替换为了lettuce jedis : 采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接 池! ...
    99+
    2023-09-07
    redis spring boot java
  • SpringBoot整合之SpringBoot整合MongoDB的详细步骤
    目录一、创建项目,选择依赖二、引入相关依赖(非必要)三、如果是第一次使用MongoDB,首先先创建用户四、定义核心配置文件六、创建dao层,这里的dao层有两种写法MongoDB 是...
    99+
    2022-11-12
  • SpringBoot与Shiro整合
    一,Shiro 体系结构 Apache Shiro是一个强大且易用的Java安全框架,执行身份验证、授权、密码和会话管理。使用Shiro的易于理解的API,您可以快速、轻松地获得任何应用程序,从最小的...
    99+
    2023-10-02
    spring boot java spring mysql 系统安全
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作