CofeehousePy/hyper_internal_service
Netkas 3294bc4ea6 Added NSFW classification 2021-01-14 02:07:24 -05:00
..
hyper_internal_service Added NSFW classification 2021-01-14 02:07:24 -05:00
requirements Added NSFW classification 2021-01-14 02:07:24 -05:00
vendor/http-parser Added NSFW classification 2021-01-14 02:07:24 -05:00
LICENSE Added NSFW classification 2021-01-14 02:07:24 -05:00
MANIFEST.in Added NSFW classification 2021-01-14 02:07:24 -05:00
Makefile Added NSFW classification 2021-01-14 02:07:24 -05:00
README.md Added Hyper-Internal-Service 2020-12-25 15:14:57 -05:00
cython_requirements.txt Added NSFW classification 2021-01-14 02:07:24 -05:00
dev_requirements.txt Added NSFW classification 2021-01-14 02:07:24 -05:00
setup.py Added NSFW classification 2021-01-14 02:07:24 -05:00

README.md

Hyper Internal Service

Hyper Internal Service is a internal async HTTP client/server allowing for different internal components to communicate with each other using various interchangeable data formats.

Installation

sudo -H make install

or

python3 -m pip install -Ur dev_requirements.txt
python3 setup.py install

Example Server

from hyper_internal_service import web


async def handle(request):
    name = request.match_info.get("name", "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)


async def wshandle(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.WSMsgType.BINARY:
            await ws.send_bytes(msg.data)
        elif msg.type == web.WSMsgType.CLOSE:
            break

    return ws


app = web.Application()
app.add_routes([web.get("/", handle),
                web.get("/echo", wshandle),
                web.get("/{name}", handle)])

web.run_app(app)

Example Client

import asyncio
import hyper_internal_service


async def fetch(session):
    print('Query http://httpbin.org/get')
    async with session.get(
            'http://httpbin.org/get') as resp:
        print(resp.status)
        data = await resp.json()
        print(data)


async def go():
    async with hyper_internal_service.ClientSession() as session:
        await fetch(session)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()